diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 4c8551da95..b22b4c813e 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -23,8 +23,6 @@ import ( "hash" "io" "sync" - // "github.com/ethereum/go-ethereum/logger" - // "github.com/ethereum/go-ethereum/logger/glog" ) /* @@ -124,12 +122,13 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s jobC := make(chan *hashJob, 2*processors) wg := &sync.WaitGroup{} errC := make(chan error) + quitC := make(chan bool) // wwg = workers waitgroup keeps track of hashworkers spawned by this split call if wwg != nil { wwg.Add(1) } - go self.hashWorker(jobC, chunkC, errC, swg, wwg) + go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) depth := 0 treeSize := self.chunkSize @@ -141,11 +140,10 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s } key := make([]byte, self.hashFunc().Size()) - // glog.V(logger.Detail).Infof("split request received for data (%v bytes, depth: %v)", size, depth) // this waitgroup member is released after the root hash is calculated wg.Add(1) //launch actual recursive function passing the waitgroups - go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, wg, swg, wwg) + go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -153,7 +151,6 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s wg.Wait() // if storage waitgroup is non-nil, we wait for storage to finish too if swg != nil { - // glog.V(logger.Detail).Infof("Waiting for storage to finish") swg.Wait() } close(errC) @@ -162,14 +159,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s select { case err := <-errC: if err != nil { + close(quitC) return nil, err } - // + //TODO: add a timeout } return key, nil } -func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, parentWg, swg, wwg *sync.WaitGroup) { +func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) { for depth > 0 && size < treeSize { treeSize /= self.branches @@ -180,17 +178,20 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade // leaf nodes -> content chunks chunkData := make([]byte, size+8) binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size)) - data.Read(chunkData[8:]) + _, err := data.Read(chunkData[8:]) + if err != nil { + errC <- err + return + } select { case jobC <- &hashJob{key, chunkData, size, parentWg}: - case <-errC: + case <-quitC: } - // glog.V(logger.Detail).Infof("read %v", size) return } + // dept > 0 // intermediate chunk containing child nodes hashes branchCnt := int64((size + treeSize - 1) / treeSize) - // glog.V(logger.Detail).Infof("intermediate node: setting branches: %v, depth: %v, max subtree size: %v, data size: %v", branches, depth, treeSize, size) var chunk []byte = make([]byte, branchCnt*self.hashSize+8) var pos, i int64 @@ -210,7 +211,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize] childrenWg.Add(1) - self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, childrenWg, swg, wwg) + self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg) i++ pos += treeSize @@ -224,15 +225,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade wwg.Add(1) } self.workerCount++ - go self.hashWorker(jobC, chunkC, errC, swg, wwg) + go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) } select { case jobC <- &hashJob{key, chunk, size, parentWg}: - case <-errC: + case <-quitC: } } -func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, swg, wwg *sync.WaitGroup) { +func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) { hasher := self.hashFunc() if wwg != nil { defer wwg.Done() @@ -247,8 +248,7 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC // now we got the hashes in the chunk, then hash the chunks hasher.Reset() self.hashChunk(hasher, job, chunkC, swg) - // glog.V(logger.Detail).Infof("hash chunk (%v)", job.size) - case <-errC: + case <-quitC: return } } @@ -276,6 +276,7 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan * } } job.parentWg.Done() + if chunkC != nil { chunkC <- newChunk } @@ -328,7 +329,6 @@ func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { // this is correct, a swarm doc cannot be zero length, so no EOF is expected if len(b) == 0 { - // glog.V(logger.Detail).Infof("Size query for %v", chunk.Key.Log()) return 0, nil } quitC := make(chan bool) @@ -336,13 +336,10 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { if err != nil { return 0, err } - // glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size) errC := make(chan error) - // glog.V(logger.Detail).Infof("readAt: reading %v into %d bytes at offset %d.", self.chunk.Key.Log(), len(b), off) // } - // glog.V(logger.Detail).Infof("-> want: %v, off: %v size: %v ", want, off, self.size) var treeSize int64 var depth int // calculate depth and max treeSize @@ -364,22 +361,16 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return 0, err } - // glog.V(logger.Detail).Infof("ReadAt received %v", err) - // glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size) if off+int64(len(b)) >= size { - // glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b)) return len(b), io.EOF } - // glog.V(logger.Detail).Infof("ReadAt returning at %d: %v", read, err) return len(b), nil } func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // return NewDPA(&LocalStore{}) - // glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff) - // glog.V(logger.Detail).Infof("depth: %v, loff: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, off, eoff, chunk.Size, treeSize) // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) @@ -391,7 +382,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr // leaf chunk found if depth == 0 { - // glog.V(logger.Detail).Infof("depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, len(chunk.SData), treeSize) extra := 8 + eoff - int64(len(chunk.SData)) if extra > 0 { eoff -= extra @@ -406,7 +396,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr wg := &sync.WaitGroup{} defer wg.Wait() - // glog.V(logger.Detail).Infof("start %v,end %v", start, end) for i := start; i < end; i++ { soff := i * treeSize @@ -425,7 +414,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr wg.Add(1) go func(j int64) { childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize] - // glog.V(logger.Detail).Infof("subtree ind.ex: %v -> %v", j, childKey.Log()) chunk := retrieve(childKey, self.chunkC, quitC) if chunk == nil { select { @@ -450,7 +438,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { Key: key, C: make(chan bool), // close channel to signal data delivery } - // glog.V(logger.Detail).Infof("chunk data sent for %v (key interval in chunk %v-%v)", ch.Key.Log(), j*self.chunker.hashSize, (j+1)*self.chunker.hashSize) // submit chunk for retrieval select { case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally) @@ -464,7 +451,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { // this is how we control process leakage (quitC is closed once join is finished (after timeout)) return nil case <-chunk.C: // bells are ringing, data have been delivered - // glog.V(logger.Detail).Infof("chunk data received") } if len(chunk.SData) == 0 { return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) @@ -476,7 +462,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { // Read keeps a cursor so cannot be called simulateously, see ReadAt func (self *LazyChunkReader) Read(b []byte) (read int, err error) { read, err = self.ReadAt(b, self.off) - // glog.V(logger.Detail).Infof("read: %v, off: %v, error: %v", read, self.off, err) self.off += int64(read) return diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index e6ca3d0878..4f05cd1cc3 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "crypto/rand" "encoding/binary" "fmt" "io" @@ -27,6 +28,7 @@ import ( "time" ) + /* Tests TreeChunker by splitting and joining a random byte slice */ @@ -49,7 +51,7 @@ func (self *chunkerTester) checkChunks(t *testing.T, want int) { } } -func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup) (key Key) { +func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key) { // reset self.chunks = make(map[string]*Chunk) @@ -65,14 +67,9 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c select { case <-timeout: self.t.Fatalf("Join timeout error") - - case chunk, ok := <-chunkC: - if !ok { - // glog.V(logger.Info).Infof("chunkC closed quitting") - close(quitC) - return - } - // glog.V(logger.Info).Infof("chunk %v received", len(self.chunks)) + case <-quitC: + return + case chunk := <-chunkC: // self.chunks = append(self.chunks, chunk) self.chunks[chunk.Key.String()] = chunk if chunk.wg != nil { @@ -83,21 +80,16 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c }() } key, err := chunker.Split(data, size, chunkC, swg, nil) - if err != nil { + if err != nil && expectedError == nil { self.t.Fatalf("Split error: %v", err) + } else if expectedError != nil && (err == nil || err.Error() != expectedError.Error()) { + self.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err) } if chunkC != nil { if swg != nil { - // glog.V(logger.Info).Infof("Waiting for storage to finish") swg.Wait() - // glog.V(logger.Info).Infof("Storage finished") } - close(chunkC) - } - if chunkC != nil { - // glog.V(logger.Info).Infof("waiting for splitter finished") - <-quitC - // glog.V(logger.Info).Infof("Splitter finished") + close(quitC) } return } @@ -105,11 +97,9 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader { // reset but not the chunks - // glog.V(logger.Info).Infof("Splitter finished") reader := chunker.Join(key, chunkC) timeout := time.After(600 * time.Second) - // glog.V(logger.Info).Infof("Splitter finished") i := 0 go func() { for { @@ -122,15 +112,12 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch close(quitC) return } - // glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String()) // this just mocks the behaviour of a chunk store retrieval stored, success := self.chunks[chunk.Key.String()] - // glog.V(logger.Info).Infof("chunk %v, success: %v", chunk.Key.String(), success) if !success { self.t.Fatalf("not found") return } - // glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String()) chunk.SData = stored.SData chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) close(chunk.C) @@ -141,6 +128,26 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch return reader } +func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) { + data := io.LimitReader(rand.Reader, int64(n)) + brokendata := brokenLimitReader(data, n, n/2) + + buf := make([]byte, n) + _, err := brokendata.Read(buf) + if err == nil || err.Error() != "Broken reader" { + tester.t.Fatalf("Broken reader is not broken, hence broken. Returns: %v", err) + } + + data = io.LimitReader(rand.Reader, int64(n)) + brokendata = brokenLimitReader(data, n, n/2) + + chunkC := make(chan *Chunk, 1000) + swg := &sync.WaitGroup{} + + key := tester.Split(splitter, brokendata, int64(n), chunkC, swg, fmt.Errorf("Broken reader")) + tester.t.Logf(" Key = %v\n", key) +} + func testRandomData(splitter Splitter, n int, tester *chunkerTester) { if tester.inputs == nil { tester.inputs = make(map[uint64][]byte) @@ -151,13 +158,13 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { data, input = testDataReaderAndSlice(n) tester.inputs[uint64(n)] = input } else { - data = limitReader(bytes.NewReader(input), n) + data = io.LimitReader(bytes.NewReader(input), int64(n)) } chunkC := make(chan *Chunk, 1000) swg := &sync.WaitGroup{} - key := tester.Split(splitter, data, int64(n), chunkC, swg) + key := tester.Split(splitter, data, int64(n), chunkC, swg, nil) tester.t.Logf(" Key = %v\n", key) chunkC = make(chan *Chunk, 1000) @@ -166,9 +173,7 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { chunker := NewTreeChunker(NewChunkerParams()) reader := tester.Join(chunker, key, 0, chunkC, quitC) output := make([]byte, n) - // glog.V(logger.Info).Infof(" Key = %v\n", key) r, err := reader.Read(output) - // glog.V(logger.Info).Infof(" read = %v %v\n", r, err) if r != n || err != io.EOF { tester.t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err) } @@ -183,7 +188,7 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { func TestRandomData(t *testing.T) { // sizes := []int{123456} - sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 123456} + sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678} tester := &chunkerTester{t: t} chunker := NewTreeChunker(NewChunkerParams()) for _, s := range sizes { @@ -195,6 +200,16 @@ func TestRandomData(t *testing.T) { } } +func TestRandomBrokenData(t *testing.T) { + sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678} + tester := &chunkerTester{t: t} + chunker := NewTreeChunker(NewChunkerParams()) + for _, s := range sizes { + testRandomBrokenData(chunker, s, tester) + t.Logf("done size: %v", s) + } +} + func readAll(reader LazySectionReader, result []byte) { size := int64(len(result)) @@ -227,7 +242,7 @@ func benchmarkJoin(n int, t *testing.B) { chunkC := make(chan *Chunk, 1000) swg := &sync.WaitGroup{} - key := tester.Split(chunker, data, int64(n), chunkC, swg) + key := tester.Split(chunker, data, int64(n), chunkC, swg, nil) // t.StartTimer() chunkC = make(chan *Chunk, 1000) quitC := make(chan bool) @@ -248,8 +263,7 @@ func benchmarkSplitTree(n int, t *testing.B) { chunker := NewTreeChunker(NewChunkerParams()) tester := &chunkerTester{t: t} data := testDataReader(n) - // glog.V(logger.Info).Infof("splitting data of length %v", n) - tester.Split(chunker, data, int64(n), nil, nil) + tester.Split(chunker, data, int64(n), nil, nil, nil) } stats := new(runtime.MemStats) runtime.ReadMemStats(stats) @@ -262,8 +276,7 @@ func benchmarkSplitPyramid(n int, t *testing.B) { splitter := NewPyramidChunker(NewChunkerParams()) tester := &chunkerTester{t: t} data := testDataReader(n) - // glog.V(logger.Info).Infof("splitting data of length %v", n) - tester.Split(splitter, data, int64(n), nil, nil) + tester.Split(splitter, data, int64(n), nil, nil, nil) } stats := new(runtime.MemStats) runtime.ReadMemStats(stats) diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index e81a82b7bf..889b28a70b 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -19,6 +19,7 @@ package storage import ( "bytes" "crypto/rand" + "fmt" "io" "sync" "testing" @@ -27,32 +28,31 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) -type limitedReader struct { - r io.Reader - off int64 - size int64 +type brokenLimitedReader struct { + lr io.Reader + errAt int + off int + size int } -func limitReader(r io.Reader, size int) *limitedReader { - return &limitedReader{r, 0, int64(size)} -} - -func (self *limitedReader) Read(buf []byte) (int, error) { - limit := int64(len(buf)) - left := self.size - self.off - if limit >= left { - limit = left +func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader { + return &brokenLimitedReader{ + lr: data, + errAt: errAt, + size: size, } - n, err := self.r.Read(buf[:limit]) - if err == nil && limit == left { - err = io.EOF - } - self.off += int64(n) - return n, err } func testDataReader(l int) (r io.Reader) { - return limitReader(rand.Reader, l) + return io.LimitReader(rand.Reader, int64(l)) +} + +func (self *brokenLimitedReader) Read(buf []byte) (int, error) { + if self.off+len(buf) > self.errAt { + return 0, fmt.Errorf("Broken reader") + } + self.off += len(buf) + return self.lr.Read(buf) } func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { @@ -60,7 +60,7 @@ func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { if _, err := rand.Read(slice); err != nil { panic("rand error") } - r = limitReader(bytes.NewReader(slice), l) + r = io.LimitReader(bytes.NewReader(slice), int64(l)) return } diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 3c1ef17a04..79e1927b94 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -81,7 +81,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk chunks := (size + self.chunkSize - 1) / self.chunkSize depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1 - // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth) results := Tree{ Chunks: chunks, @@ -99,26 +98,24 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk go self.processor(pend, swg, tasks, chunkC, &results) } // Feed the chunks into the task pool + read := 0 for index := 0; ; index++ { buffer := make([]byte, self.chunkSize+8) n, err := data.Read(buffer[8:]) - last := err == io.ErrUnexpectedEOF || err == io.EOF - // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth) + read += n + last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF if err != nil && !last { - // glog.V(logger.Info).Infof("error: %v", err) close(abortC) break } binary.LittleEndian.PutUint64(buffer[:8], uint64(n)) pend.Add(1) - // glog.V(logger.Info).Infof("-> task %v (%v)", index, n) select { case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}: case <-abortC: return nil, err } if last { - // glog.V(logger.Info).Infof("last task %v (%v)", index, n) break } } @@ -126,7 +123,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk close(tasks) pend.Wait() - // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0]) key := results.Levels[0][0].Children[0][:] return key, nil } @@ -134,12 +130,10 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) { defer pend.Done() - // glog.V(logger.Info).Infof("processor started") // Start processing leaf chunks ad infinitum hasher := self.hashFunc() for task := range tasks { depth, pow := len(results.Levels)-1, self.branches - // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last) size := task.Size data := task.Data var node *Node @@ -171,10 +165,8 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas } node = &Node{pending, 0, make([]common.Hash, pending), last} results.Levels[depth][task.Index/pow] = node - // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending) } node.Pending-- - // glog.V(logger.Info).Infof("pending now: %v", node.Pending) i := task.Index / (pow / self.branches) % self.branches if last { node.Last = true @@ -182,7 +174,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas copy(node.Children[i][:], hash) node.Size += size left := node.Pending - // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size) if chunkC != nil { if swg != nil { swg.Add(1) @@ -198,7 +189,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas results.Lock.Unlock() // If there's more work to be done, leave for others - // glog.V(logger.Info).Infof("left %v", left) if left > 0 { break }