diff --git a/cmd/swarm/explore.go b/cmd/swarm/explore.go index 5b5b8bf41f..9566213e41 100644 --- a/cmd/swarm/explore.go +++ b/cmd/swarm/explore.go @@ -23,6 +23,7 @@ import ( "os" "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "gopkg.in/urfave/cli.v1" ) @@ -47,7 +48,7 @@ func hashes(ctx *cli.Context) { } defer f.Close() - fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags()) refs, err := fileStore.GetAllReferences(context.TODO(), f, false) if err != nil { utils.Fatalf("%v\n", err) diff --git a/cmd/swarm/hash.go b/cmd/swarm/hash.go index 2df02c0ed7..ff786fa105 100644 --- a/cmd/swarm/hash.go +++ b/cmd/swarm/hash.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/contracts/ens" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "gopkg.in/urfave/cli.v1" ) @@ -77,7 +78,7 @@ func hash(ctx *cli.Context) { defer f.Close() stat, _ := f.Stat() - fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags()) addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false) if err != nil { utils.Fatalf("%v\n", err) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index bbcf66b26b..d6eb87ace4 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -255,7 +255,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { return nil, fmt.Errorf("unable to create temp dir: %v", err) } defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) if err != nil { return nil, err } diff --git a/swarm/api/api.go b/swarm/api/api.go index 86c1119232..96fb86e1cc 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/contracts/ens" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" @@ -53,8 +54,6 @@ import ( var ( apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil) apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil) - apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil) - apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil) apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil) apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil) apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil) @@ -188,15 +187,17 @@ type API struct { feed *feed.Handler fileStore *storage.FileStore dns Resolver + Tags *chunk.Tags Decryptor func(context.Context, string) DecryptFunc } // NewAPI the api constructor initialises a new API instance. -func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) { +func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) { self = &API{ fileStore: fileStore, dns: dns, feed: feedHandler, + Tags: tags, Decryptor: func(ctx context.Context, credentials string) DecryptFunc { return self.doDecrypt(ctx, credentials, pk) }, @@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto return addr, nil } -// Put provides singleton manifest creation on top of FileStore store -func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { - apiPutCount.Inc(1) - r := strings.NewReader(content) - key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt) - if err != nil { - apiPutFail.Inc(1) - return nil, nil, err - } - manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) - r = strings.NewReader(manifest) - key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt) - if err != nil { - apiPutFail.Inc(1) - return nil, nil, err - } - return key, func(ctx context.Context) error { - err := waitContent(ctx) - if err != nil { - return err - } - return waitManifest(ctx) - }, nil -} - // Get uses iterative manifest retrieval and prefix matching // to resolve basePath to content using FileStore retrieve // it returns a section reader, mimeType, status, the key of the actual content and an error diff --git a/swarm/api/api_test.go b/swarm/api/api_test.go index eb896f32aa..4a5f923626 100644 --- a/swarm/api/api_test.go +++ b/swarm/api/api_test.go @@ -19,6 +19,7 @@ package api import ( "bytes" "context" + crand "crypto/rand" "errors" "flag" "fmt" @@ -26,13 +27,16 @@ import ( "io/ioutil" "math/big" "os" + "strings" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/testutil" ) func init() { @@ -41,19 +45,21 @@ func init() { log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true))))) } -func testAPI(t *testing.T, f func(*API, bool)) { - datadir, err := ioutil.TempDir("", "bzz-test") - if err != nil { - t.Fatalf("unable to create temp dir: %v", err) +func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) { + for _, v := range []bool{true, false} { + datadir, err := ioutil.TempDir("", "bzz-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + tags := chunk.NewTags() + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags) + if err != nil { + return + } + api := NewAPI(fileStore, nil, nil, nil, tags) + f(api, tags, v) } - defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) - if err != nil { - return - } - api := NewAPI(fileStore, nil, nil, nil) - f(api, false) - f(api, true) } type testResponse struct { @@ -61,6 +67,13 @@ type testResponse struct { *Response } +type Response struct { + MimeType string + Status int + Size int64 + Content string +} + func checkResponse(t *testing.T, resp *testResponse, exp *Response) { if resp.MimeType != exp.MimeType { @@ -111,15 +124,14 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse { } reader.Seek(0, 0) return &testResponse{reader, &Response{mimeType, status, size, string(s)}} - // return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}} } func TestApiPut(t *testing.T) { - testAPI(t, func(api *API, toEncrypt bool) { + testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) { content := "hello" exp := expResponse(content, "text/plain", 0) ctx := context.TODO() - addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) + addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -129,6 +141,40 @@ func TestApiPut(t *testing.T) { } resp := testGet(t, api, addr.Hex(), "") checkResponse(t, resp, exp) + tag := tags.All()[0] + testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest + }) +} + +// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input +func TestApiTagLarge(t *testing.T) { + const contentLength = 4096 * 4095 + testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) { + randomContentReader := io.LimitReader(crand.Reader, int64(contentLength)) + tag, err := api.Tags.New("unnamed-tag", 0) + if err != nil { + t.Fatal(err) + } + ctx := sctx.SetTag(context.Background(), tag.Uid) + key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt) + if err != nil { + t.Fatal(err) + } + err = waitContent(ctx) + if err != nil { + t.Fatal(err) + } + tag.DoneSplit(key) + + if toEncrypt { + tag := tags.All()[0] + expect := int64(4095 + 64 + 1) + testutil.CheckTag(t, tag, expect, expect, 0, expect) + } else { + tag := tags.All()[0] + expect := int64(4095 + 32 + 1) + testutil.CheckTag(t, tag, expect, expect, 0, expect) + } }) } @@ -391,7 +437,7 @@ func TestDecryptOriginForbidden(t *testing.T) { Access: &AccessEntry{Type: AccessTypePass}, } - api := NewAPI(nil, nil, nil, nil) + api := NewAPI(nil, nil, nil, nil, chunk.NewTags()) f := api.Decryptor(ctx, "") err := f(me) @@ -425,7 +471,7 @@ func TestDecryptOrigin(t *testing.T) { Access: &AccessEntry{Type: AccessTypePass}, } - api := NewAPI(nil, nil, nil, nil) + api := NewAPI(nil, nil, nil, nil, chunk.NewTags()) f := api.Decryptor(ctx, "") err := f(me) @@ -500,3 +546,31 @@ func TestDetectContentType(t *testing.T) { }) } } + +// putString provides singleton manifest creation on top of api.API +func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { + r := strings.NewReader(content) + tag, err := a.Tags.New("unnamed-tag", 0) + + log.Trace("created new tag", "uid", tag.Uid) + + cCtx := sctx.SetTag(ctx, tag.Uid) + key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt) + if err != nil { + return nil, nil, err + } + manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) + r = strings.NewReader(manifest) + key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt) + if err != nil { + return nil, nil, err + } + tag.DoneSplit(key) + return key, func(ctx context.Context) error { + err := waitContent(ctx) + if err != nil { + return err + } + return waitManifest(ctx) + }, nil +} diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index 5e293cca72..9ad0948f43 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/pborman/uuid" @@ -75,6 +76,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err return "", err } req.ContentLength = size + req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix())) + res, err := http.DefaultClient.Do(req) if err != nil { return "", err @@ -111,6 +114,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) { type File struct { io.ReadCloser api.ManifestEntry + Tag string } // Open opens a local file which can then be passed to client.Upload to upload @@ -139,6 +143,7 @@ func Open(path string) (*File, error) { Size: stat.Size(), ModTime: stat.ModTime(), }, + Tag: filepath.Base(path), }, nil } @@ -422,6 +427,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro // Uploader uploads files to swarm using a provided UploadFn type Uploader interface { Upload(UploadFn) error + Tag() string } type UploaderFunc func(UploadFn) error @@ -430,12 +436,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error { return u(upload) } +func (u UploaderFunc) Tag() string { + return fmt.Sprintf("multipart_upload_%d", time.Now().Unix()) +} + +// DirectoryUploader implements Uploader +var _ Uploader = &DirectoryUploader{} + // DirectoryUploader uploads all files in a directory, optionally uploading // a file to the default path type DirectoryUploader struct { Dir string } +func (d *DirectoryUploader) Tag() string { + return filepath.Base(d.Dir) +} + // Upload performs the upload of the directory and default path func (d *DirectoryUploader) Upload(upload UploadFn) error { return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error { @@ -458,11 +475,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error { }) } +var _ Uploader = &FileUploader{} + // FileUploader uploads a single file type FileUploader struct { File *File } +func (f *FileUploader) Tag() string { + return f.File.Tag +} + // Upload performs the upload of the file func (f *FileUploader) Upload(upload UploadFn) error { return upload(f.File) @@ -509,6 +532,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t req.URL.RawQuery = q.Encode() } + tag := uploader.Tag() + if tag == "" { + tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix()) + } + log.Trace("setting upload tag", "tag", tag) + + req.Header.Set(swarmhttp.SwarmTagHeaderName, tag) + // use 'Expect: 100-continue' so we don't send the request body if // the server refuses the request req.Header.Set("Expect", "100-continue") @@ -574,6 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error) mw := multipart.NewWriter(reqW) req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary())) + req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix())) // define an UploadFn which adds files to the multipart form uploadFn := func(file *File) error { diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go index 9c9bde5d67..92489849c4 100644 --- a/swarm/api/client/client_test.go +++ b/swarm/api/client/client_test.go @@ -25,16 +25,14 @@ import ( "sort" "testing" - "github.com/ethereum/go-ethereum/swarm/testutil" - - "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/swarm/api" swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" + "github.com/ethereum/go-ethereum/swarm/testutil" ) func serverFunc(api *api.API) swarmhttp.TestServer { @@ -68,6 +66,10 @@ func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) { t.Fatal(err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 1, 1, 0, 1) + // check we can download the same data res, isEncrypted, err := client.DownloadRaw(hash) if err != nil { @@ -209,6 +211,10 @@ func TestClientUploadDownloadDirectory(t *testing.T) { t.Fatalf("error uploading directory: %s", err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 9, 9, 0, 9) + // check we can download the individual files checkDownloadFile := func(path string, expected []byte) { file, err := client.Download(hash, path) @@ -323,6 +329,7 @@ func TestClientMultipartUpload(t *testing.T) { defer srv.Close() // define an uploader which uploads testDirFiles with some data + // note: this test should result in SEEN chunks. assert accordingly data := []byte("some-data") uploader := UploaderFunc(func(upload UploadFn) error { for _, name := range testDirFiles { @@ -348,6 +355,10 @@ func TestClientMultipartUpload(t *testing.T) { t.Fatal(err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 9, 9, 7, 9) + // check we can download the individual files checkDownloadFile := func(path string) { file, err := client.Download(hash, path) diff --git a/swarm/api/filesystem_test.go b/swarm/api/filesystem_test.go index 02f5bff658..b8f37fdd57 100644 --- a/swarm/api/filesystem_test.go +++ b/swarm/api/filesystem_test.go @@ -25,13 +25,14 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) var testDownloadDir, _ = ioutil.TempDir(os.TempDir(), "bzz-test") func testFileSystem(t *testing.T, f func(*FileSystem, bool)) { - testAPI(t, func(api *API, toEncrypt bool) { + testAPI(t, func(api *API, _ *chunk.Tags, toEncrypt bool) { f(NewFileSystem(api), toEncrypt) }) } diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 320da30462..e6e263f4c0 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -86,6 +87,54 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler { }) } +// InitUploadTag creates a new tag for an upload to the local HTTP proxy +// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used +// when the Content-Length header is set, an ETA on chunking will be available since the +// number of chunks to be split is known in advance (not including enclosing manifest chunks) +// the tag can later be accessed using the appropriate identifier in the request context +func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var ( + tagName string + err error + estimatedTotal int64 = 0 + contentType = r.Header.Get("Content-Type") + headerTag = r.Header.Get(SwarmTagHeaderName) + ) + if headerTag != "" { + tagName = headerTag + log.Trace("got tag name from http header", "tagName", tagName) + } else { + tagName = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix()) + } + + if !strings.Contains(contentType, "multipart") && r.ContentLength > 0 { + log.Trace("calculating tag size", "contentType", contentType, "contentLength", r.ContentLength) + uri := GetURI(r.Context()) + if uri != nil { + log.Debug("got uri from context") + if uri.Addr == "encrypt" { + estimatedTotal = calculateNumberOfChunks(r.ContentLength, true) + } else { + estimatedTotal = calculateNumberOfChunks(r.ContentLength, false) + } + } + } + + log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal) + + t, err := tags.New(tagName, estimatedTotal) + if err != nil { + log.Error("error creating tag", "err", err, "tagName", tagName) + } + + log.Trace("setting tag id to context", "uid", t.Uid) + ctx := sctx.SetTag(r.Context(), t.Uid) + + h.ServeHTTP(w, r.WithContext(ctx)) + }) +} + func InstrumentOpenTracing(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { uri := GetURI(r.Context()) diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go index d4e81d7f67..c851a3992e 100644 --- a/swarm/api/http/response.go +++ b/swarm/api/http/response.go @@ -79,7 +79,7 @@ func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s } func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) { - log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) + log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code, "msg", msg) respondTemplate(w, r, "error", msg, code) } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index 3c6735a73e..a336bd82ff 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "mime" "mime/multipart" "net/http" @@ -38,7 +39,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/rs/cors" @@ -60,6 +63,8 @@ var ( getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil) ) +const SwarmTagHeaderName = "x-swarm-tag" + type methodHandler map[string]http.Handler func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { @@ -94,6 +99,12 @@ func NewServer(api *api.API, corsString string) *Server { InstrumentOpenTracing, } + tagAdapter := Adapter(func(h http.Handler) http.Handler { + return InitUploadTag(h, api.Tags) + }) + + defaultPostMiddlewares := append(defaultMiddlewares, tagAdapter) + mux := http.NewServeMux() mux.Handle("/bzz:/", methodHandler{ "GET": Adapt( @@ -102,7 +113,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostFiles), - defaultMiddlewares..., + defaultPostMiddlewares..., ), "DELETE": Adapt( http.HandlerFunc(server.HandleDelete), @@ -116,7 +127,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostRaw), - defaultMiddlewares..., + defaultPostMiddlewares..., ), }) mux.Handle("/bzz-immutable:/", methodHandler{ @@ -230,6 +241,12 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { ruid := GetRUID(r.Context()) log.Debug("handle.post.raw", "ruid", ruid) + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + postRawCount.Inc(1) toEncrypt := false @@ -256,13 +273,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { return } - addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) + addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) respondError(w, r, err.Error(), http.StatusInternalServerError) return } + wait(r.Context()) + tag.DoneSplit(addr) + log.Debug("stored content", "ruid", ruid, "key", addr) w.Header().Set("Content-Type", "text/plain") @@ -311,7 +331,6 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } log.Debug("new manifest", "ruid", ruid, "key", addr) } - newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error { switch contentType { case "application/x-tar": @@ -334,6 +353,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { return } + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + + log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total()) + tag.DoneSplit(newAddr) + log.Debug("stored content", "ruid", ruid, "key", newAddr) w.Header().Set("Content-Type", "text/plain") @@ -342,7 +370,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (storage.Address, error) { - log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context())) + log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context())) defaultPath := r.URL.Query().Get("defaultpath") @@ -837,6 +865,28 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize)) } +// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length +func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 { + if contentLength < 4096 { + return 1 + } + branchingFactor := 128 + if isEncrypted { + branchingFactor = 64 + } + + dataChunks := math.Ceil(float64(contentLength) / float64(4096)) + totalChunks := dataChunks + intermediate := dataChunks / float64(branchingFactor) + + for intermediate > 1 { + totalChunks += math.Ceil(intermediate) + intermediate = intermediate / float64(branchingFactor) + } + + return int64(totalChunks) + 1 +} + // The size of buffer used for bufio.Reader on LazyChunkReader passed to // http.ServeContent in HandleGetFile. // Warning: This value influences the number of chunk requests and chunker join goroutines diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index e82762ce05..1de41d18d3 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -44,7 +44,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" - swarm "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" @@ -755,6 +754,7 @@ func testBzzTar(encrypted bool, t *testing.T) { t.Fatal(err) } req.Header.Add("Content-Type", "application/x-tar") + req.Header.Add(SwarmTagHeaderName, "test-upload") client := &http.Client{} resp2, err := client.Do(req) if err != nil { @@ -763,6 +763,11 @@ func testBzzTar(encrypted bool, t *testing.T) { if resp2.StatusCode != http.StatusOK { t.Fatalf("err %s", resp2.Status) } + + // check that the tag was written correctly + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 4, 4, 0, 4) + swarmHash, err := ioutil.ReadAll(resp2.Body) resp2.Body.Close() if err != nil { @@ -834,6 +839,75 @@ func testBzzTar(encrypted bool, t *testing.T) { t.Fatalf("file %s did not pass content assertion", hdr.Name) } } + + // now check the tags endpoint +} + +// TestBzzCorrectTagEstimate checks that the HTTP middleware sets the total number of chunks +// in the tag according to an estimate from the HTTP request Content-Length header divided +// by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore +// concurrency was introduced to slow down the HTTP request +func TestBzzCorrectTagEstimate(t *testing.T) { + srv := NewTestSwarmServer(t, serverFunc, nil) + defer srv.Close() + + for _, v := range []struct { + toEncrypt bool + expChunks int64 + }{ + {toEncrypt: false, expChunks: 248}, + {toEncrypt: true, expChunks: 250}, + } { + pr, pw := io.Pipe() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addr := "" + if v.toEncrypt { + addr = "encrypt" + } + req, err := http.NewRequest("POST", srv.URL+"/bzz:/"+addr, pr) + if err != nil { + t.Fatal(err) + } + + req = req.WithContext(ctx) + req.ContentLength = 1000000 + req.Header.Add(SwarmTagHeaderName, "1000000") + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Millisecond): + _, err := pw.Write([]byte{0}) + if err != nil { + t.Error(err) + } + } + } + }() + go func() { + transport := http.DefaultTransport + _, err := transport.RoundTrip(req) + if err != nil { + t.Error(err) + } + }() + done := false + for !done { + switch len(srv.Tags.All()) { + case 0: + <-time.After(10 * time.Millisecond) + case 1: + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks) + srv.Tags.Delete(tag.Uid) + done = true + } + } + } } // TestBzzRootRedirect tests that getting the root path of a manifest without @@ -851,19 +925,11 @@ func testBzzRootRedirect(toEncrypt bool, t *testing.T) { defer srv.Close() // create a manifest with some data at the root path - client := swarm.NewClient(srv.URL) data := []byte("data") - file := &swarm.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), - ManifestEntry: api.ManifestEntry{ - Path: "", - ContentType: "text/plain", - Size: int64(len(data)), - }, - } - hash, err := client.Upload(file, "", toEncrypt) - if err != nil { - t.Fatal(err) + headers := map[string]string{"Content-Type": "text/plain"} + res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader(data), headers, false, t) + if res.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK) } // define a CheckRedirect hook which ensures there is only a single @@ -1046,21 +1112,10 @@ func TestGet(t *testing.T) { func TestModify(t *testing.T) { srv := NewTestSwarmServer(t, serverFunc, nil) defer srv.Close() - - swarmClient := swarm.NewClient(srv.URL) - data := []byte("data") - file := &swarm.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), - ManifestEntry: api.ManifestEntry{ - Path: "", - ContentType: "text/plain", - Size: int64(len(data)), - }, - } - - hash, err := swarmClient.Upload(file, "", false) - if err != nil { - t.Fatal(err) + headers := map[string]string{"Content-Type": "text/plain"} + res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader([]byte("data")), headers, false, t) + if res.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK) } for _, testCase := range []struct { @@ -1283,6 +1338,46 @@ func TestBzzGetFileWithResolver(t *testing.T) { } } +// TestCalculateNumberOfChunks is a unit test for the chunk-number-according-to-content-length +// calculation +func TestCalculateNumberOfChunks(t *testing.T) { + + //test cases: + for _, tc := range []struct{ len, chunks int64 }{ + {len: 1000, chunks: 1}, + {len: 5000, chunks: 3}, + {len: 10000, chunks: 4}, + {len: 100000, chunks: 26}, + {len: 1000000, chunks: 248}, + {len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1}, + } { + res := calculateNumberOfChunks(tc.len, false) + if res != tc.chunks { + t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res) + } + } +} + +// TestCalculateNumberOfChunksEncrypted is a unit test for the chunk-number-according-to-content-length +// calculation with encryption (branching factor=64) +func TestCalculateNumberOfChunksEncrypted(t *testing.T) { + + //test cases: + for _, tc := range []struct{ len, chunks int64 }{ + {len: 1000, chunks: 1}, + {len: 5000, chunks: 3}, + {len: 10000, chunks: 4}, + {len: 100000, chunks: 26}, + {len: 1000000, chunks: 245 + 4 + 1}, + {len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1}, + } { + res := calculateNumberOfChunks(tc.len, true) + if res != tc.chunks { + t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res) + } + } +} + // testResolver implements the Resolver interface and either returns the given // hash if it is set, or returns a "name not found" error type testResolveValidator struct { @@ -1308,6 +1403,7 @@ func (t *testResolveValidator) Resolve(addr string) (common.Hash, error) { func (t *testResolveValidator) Owner(node [32]byte) (addr common.Address, err error) { return } + func (t *testResolveValidator) HeaderByNumber(context.Context, *big.Int) (header *types.Header, err error) { return } diff --git a/swarm/api/http/test_server.go b/swarm/api/http/test_server.go index 928a6e972c..fbb3366e27 100644 --- a/swarm/api/http/test_server.go +++ b/swarm/api/http/test_server.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/storage/localstore" @@ -44,7 +45,9 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso t.Fatal(err) } - fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) + tags := chunk.NewTags() + fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams(), tags) + // Swarm feeds test setup feedsDir, err := ioutil.TempDir("", "swarm-feeds-test") if err != nil { @@ -56,12 +59,13 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso t.Fatal(err) } - swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil) + swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil, tags) apiServer := httptest.NewServer(serverFunc(swarmApi)) tss := &TestSwarmServer{ Server: apiServer, FileStore: fileStore, + Tags: tags, dir: swarmDir, Hasher: storage.MakeHashFunc(storage.DefaultHash)(), cleanup: func() { @@ -81,6 +85,7 @@ type TestSwarmServer struct { *httptest.Server Hasher storage.SwarmHash FileStore *storage.FileStore + Tags *chunk.Tags dir string cleanup func() CurrentTime uint64 diff --git a/swarm/api/manifest_test.go b/swarm/api/manifest_test.go index 1c8e53c433..c193ebcb4d 100644 --- a/swarm/api/manifest_test.go +++ b/swarm/api/manifest_test.go @@ -25,6 +25,7 @@ import ( "strings" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -42,7 +43,7 @@ func manifest(paths ...string) (manifestReader storage.LazySectionReader) { func testGetEntry(t *testing.T, path, match string, multiple bool, paths ...string) *manifestTrie { quitC := make(chan bool) - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(manifest(paths...), ref, fileStore, false, quitC, NOOPDecrypt) if err != nil { @@ -99,7 +100,7 @@ func TestGetEntry(t *testing.T) { func TestExactMatch(t *testing.T) { quitC := make(chan bool) mf := manifest("shouldBeExactMatch.css", "shouldBeExactMatch.css.map") - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(mf, ref, fileStore, false, quitC, nil) if err != nil { @@ -132,7 +133,7 @@ func TestAddFileWithManifestPath(t *testing.T) { reader := &storage.LazyTestSectionReader{ SectionReader: io.NewSectionReader(bytes.NewReader(manifest), 0, int64(len(manifest))), } - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(reader, ref, fileStore, false, nil, NOOPDecrypt) if err != nil { diff --git a/swarm/api/storage.go b/swarm/api/storage.go deleted file mode 100644 index 254375b777..0000000000 --- a/swarm/api/storage.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package api - -import ( - "context" - "path" - - "github.com/ethereum/go-ethereum/swarm/storage" -) - -type Response struct { - MimeType string - Status int - Size int64 - // Content []byte - Content string -} - -// implements a service -// -// DEPRECATED: Use the HTTP API instead -type Storage struct { - api *API -} - -func NewStorage(api *API) *Storage { - return &Storage{api} -} - -// Put uploads the content to the swarm with a simple manifest speficying -// its content type -// -// DEPRECATED: Use the HTTP API instead -func (s *Storage) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (storage.Address, func(context.Context) error, error) { - return s.api.Put(ctx, content, contentType, toEncrypt) -} - -// Get retrieves the content from bzzpath and reads the response in full -// It returns the Response object, which serialises containing the -// response body as the value of the Content field -// NOTE: if error is non-nil, sResponse may still have partial content -// the actual size of which is given in len(resp.Content), while the expected -// size is resp.Size -// -// DEPRECATED: Use the HTTP API instead -func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) { - uri, err := Parse(path.Join("bzz:/", bzzpath)) - if err != nil { - return nil, err - } - addr, err := s.api.Resolve(ctx, uri.Addr) - if err != nil { - return nil, err - } - reader, mimeType, status, _, err := s.api.Get(ctx, nil, addr, uri.Path) - if err != nil { - return nil, err - } - quitC := make(chan bool) - expsize, err := reader.Size(ctx, quitC) - if err != nil { - return nil, err - } - body := make([]byte, expsize) - size, err := reader.Read(body) - if int64(size) == expsize { - err = nil - } - return &Response{mimeType, status, expsize, string(body[:size])}, err -} diff --git a/swarm/api/storage_test.go b/swarm/api/storage_test.go deleted file mode 100644 index ef96972b68..0000000000 --- a/swarm/api/storage_test.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package api - -import ( - "context" - "testing" -) - -func testStorage(t *testing.T, f func(*Storage, bool)) { - testAPI(t, func(api *API, toEncrypt bool) { - f(NewStorage(api), toEncrypt) - }) -} - -func TestStoragePutGet(t *testing.T) { - testStorage(t, func(api *Storage, toEncrypt bool) { - content := "hello" - exp := expResponse(content, "text/plain", 0) - // exp := expResponse([]byte(content), "text/plain", 0) - ctx := context.TODO() - bzzkey, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - err = wait(ctx) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - bzzhash := bzzkey.Hex() - // to check put against the API#Get - resp0 := testGet(t, api.api, bzzhash, "") - checkResponse(t, resp0, exp) - - // check storage#Get - resp, err := api.Get(context.TODO(), bzzhash) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - checkResponse(t, &testResponse{nil, resp}, exp) - }) -} diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go index 359ac11acb..ee700d22bf 100644 --- a/swarm/chunk/tag.go +++ b/swarm/chunk/tag.go @@ -34,11 +34,11 @@ var ( type State = uint32 const ( - SPLIT State = iota // chunk has been processed by filehasher/swarm safe call - STORED // chunk stored locally - SEEN // chunk previously seen - SENT // chunk sent to neighbourhood - SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere + StateSplit State = iota // chunk has been processed by filehasher/swarm safe call + StateStored // chunk stored locally + StateSeen // chunk previously seen + StateSent // chunk sent to neighbourhood + StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere ) // Tag represents info on the status of new chunks @@ -46,18 +46,18 @@ type Tag struct { Uid uint32 // a unique identifier for this tag Name string // a name tag for this tag Address Address // the associated swarm hash for this tag - total uint32 // total chunks belonging to a tag - split uint32 // number of chunks already processed by splitter for hashing - seen uint32 // number of chunks already seen - stored uint32 // number of chunks already stored locally - sent uint32 // number of chunks sent for push syncing - synced uint32 // number of chunks synced with proof + total int64 // total chunks belonging to a tag + split int64 // number of chunks already processed by splitter for hashing + seen int64 // number of chunks already seen + stored int64 // number of chunks already stored locally + sent int64 // number of chunks sent for push syncing + synced int64 // number of chunks synced with proof startedAt time.Time // tag started to calculate ETA } // New creates a new tag, stores it by the name and returns it // it returns an error if the tag with this name already exists -func NewTag(uid uint32, s string, total uint32) *Tag { +func NewTag(uid uint32, s string, total int64) *Tag { t := &Tag{ Uid: uid, Name: s, @@ -69,65 +69,65 @@ func NewTag(uid uint32, s string, total uint32) *Tag { // Inc increments the count for a state func (t *Tag) Inc(state State) { - var v *uint32 + var v *int64 switch state { - case SPLIT: + case StateSplit: v = &t.split - case STORED: + case StateStored: v = &t.stored - case SEEN: + case StateSeen: v = &t.seen - case SENT: + case StateSent: v = &t.sent - case SYNCED: + case StateSynced: v = &t.synced } - atomic.AddUint32(v, 1) + atomic.AddInt64(v, 1) } // Get returns the count for a state on a tag -func (t *Tag) Get(state State) int { - var v *uint32 +func (t *Tag) Get(state State) int64 { + var v *int64 switch state { - case SPLIT: + case StateSplit: v = &t.split - case STORED: + case StateStored: v = &t.stored - case SEEN: + case StateSeen: v = &t.seen - case SENT: + case StateSent: v = &t.sent - case SYNCED: + case StateSynced: v = &t.synced } - return int(atomic.LoadUint32(v)) + return atomic.LoadInt64(v) } // GetTotal returns the total count -func (t *Tag) Total() int { - return int(atomic.LoadUint32(&t.total)) +func (t *Tag) Total() int64 { + return atomic.LoadInt64(&t.total) } // DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag // is meant to be called when splitter finishes for input streams of unknown size -func (t *Tag) DoneSplit(address Address) int { - total := atomic.LoadUint32(&t.split) - atomic.StoreUint32(&t.total, total) +func (t *Tag) DoneSplit(address Address) int64 { + total := atomic.LoadInt64(&t.split) + atomic.StoreInt64(&t.total, total) t.Address = address - return int(total) + return total } // Status returns the value of state and the total count -func (t *Tag) Status(state State) (int, int, error) { - count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total)) +func (t *Tag) Status(state State) (int64, int64, error) { + count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total) if total == 0 { return count, total, errNA } switch state { - case SPLIT, STORED, SEEN: + case StateSplit, StateStored, StateSeen: return count, total, nil - case SENT, SYNCED: - stored := int(atomic.LoadUint32(&t.stored)) + case StateSent, StateSynced: + stored := atomic.LoadInt64(&t.stored) if stored < total { return count, total - seen, errNA } @@ -152,14 +152,14 @@ func (t *Tag) ETA(state State) (time.Time, error) { // MarshalBinary marshals the tag into a byte slice func (tag *Tag) MarshalBinary() (data []byte, err error) { - buffer := make([]byte, 0) - encodeUint32Append(&buffer, tag.Uid) - encodeUint32Append(&buffer, tag.total) - encodeUint32Append(&buffer, tag.split) - encodeUint32Append(&buffer, tag.seen) - encodeUint32Append(&buffer, tag.stored) - encodeUint32Append(&buffer, tag.sent) - encodeUint32Append(&buffer, tag.synced) + buffer := make([]byte, 4) + binary.BigEndian.PutUint32(buffer, tag.Uid) + encodeInt64Append(&buffer, tag.total) + encodeInt64Append(&buffer, tag.split) + encodeInt64Append(&buffer, tag.seen) + encodeInt64Append(&buffer, tag.stored) + encodeInt64Append(&buffer, tag.sent) + encodeInt64Append(&buffer, tag.synced) intBuffer := make([]byte, 8) @@ -181,14 +181,15 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error { if len(buffer) < 13 { return errors.New("buffer too short") } + tag.Uid = binary.BigEndian.Uint32(buffer) + buffer = buffer[4:] - tag.Uid = decodeUint32Splice(&buffer) - tag.total = decodeUint32Splice(&buffer) - tag.split = decodeUint32Splice(&buffer) - tag.seen = decodeUint32Splice(&buffer) - tag.stored = decodeUint32Splice(&buffer) - tag.sent = decodeUint32Splice(&buffer) - tag.synced = decodeUint32Splice(&buffer) + tag.total = decodeInt64Splice(&buffer) + tag.split = decodeInt64Splice(&buffer) + tag.seen = decodeInt64Splice(&buffer) + tag.stored = decodeInt64Splice(&buffer) + tag.sent = decodeInt64Splice(&buffer) + tag.synced = decodeInt64Splice(&buffer) t, n := binary.Varint(buffer) tag.startedAt = time.Unix(t, 0) @@ -202,17 +203,16 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error { tag.Name = string(buffer[t:]) return nil - } -func encodeUint32Append(buffer *[]byte, val uint32) { - intBuffer := make([]byte, 4) - binary.BigEndian.PutUint32(intBuffer, val) - *buffer = append(*buffer, intBuffer...) +func encodeInt64Append(buffer *[]byte, val int64) { + intBuffer := make([]byte, 8) + n := binary.PutVarint(intBuffer, val) + *buffer = append(*buffer, intBuffer[:n]...) } -func decodeUint32Splice(buffer *[]byte) uint32 { - val := binary.BigEndian.Uint32((*buffer)[:4]) - *buffer = (*buffer)[4:] +func decodeInt64Splice(buffer *[]byte) int64 { + val, n := binary.Varint((*buffer)) + *buffer = (*buffer)[n:] return val } diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go index b3f3be2cad..e6acfb185b 100644 --- a/swarm/chunk/tag_test.go +++ b/swarm/chunk/tag_test.go @@ -24,7 +24,7 @@ import ( ) var ( - allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED} + allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced} ) // TestTagSingleIncrements tests if Inc increments the tag state value @@ -34,14 +34,14 @@ func TestTagSingleIncrements(t *testing.T) { tc := []struct { state uint32 inc int - expcount int - exptotal int + expcount int64 + exptotal int64 }{ - {state: SPLIT, inc: 10, expcount: 10, exptotal: 10}, - {state: STORED, inc: 9, expcount: 9, exptotal: 9}, - {state: SEEN, inc: 1, expcount: 1, exptotal: 10}, - {state: SENT, inc: 9, expcount: 9, exptotal: 9}, - {state: SYNCED, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSplit, inc: 10, expcount: 10, exptotal: 10}, + {state: StateStored, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSeen, inc: 1, expcount: 1, exptotal: 10}, + {state: StateSent, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSynced, inc: 9, expcount: 9, exptotal: 9}, } for _, tc := range tc { @@ -60,24 +60,24 @@ func TestTagSingleIncrements(t *testing.T) { // TestTagStatus is a unit test to cover Tag.Status method functionality func TestTagStatus(t *testing.T) { tg := &Tag{total: 10} - tg.Inc(SEEN) - tg.Inc(SENT) - tg.Inc(SYNCED) + tg.Inc(StateSeen) + tg.Inc(StateSent) + tg.Inc(StateSynced) for i := 0; i < 10; i++ { - tg.Inc(SPLIT) - tg.Inc(STORED) + tg.Inc(StateSplit) + tg.Inc(StateStored) } for _, v := range []struct { state State - expVal int - expTotal int + expVal int64 + expTotal int64 }{ - {state: STORED, expVal: 10, expTotal: 10}, - {state: SPLIT, expVal: 10, expTotal: 10}, - {state: SEEN, expVal: 1, expTotal: 10}, - {state: SENT, expVal: 1, expTotal: 9}, - {state: SYNCED, expVal: 1, expTotal: 9}, + {state: StateStored, expVal: 10, expTotal: 10}, + {state: StateSplit, expVal: 10, expTotal: 10}, + {state: StateSeen, expVal: 1, expTotal: 10}, + {state: StateSent, expVal: 1, expTotal: 9}, + {state: StateSynced, expVal: 1, expTotal: 9}, } { val, total, err := tg.Status(v.state) if err != nil { @@ -98,8 +98,8 @@ func TestTagETA(t *testing.T) { maxDiff := 100000 // 100 microsecond tg := &Tag{total: 10, startedAt: now} time.Sleep(100 * time.Millisecond) - tg.Inc(SPLIT) - eta, err := tg.ETA(SPLIT) + tg.Inc(StateSplit) + eta, err := tg.ETA(StateSplit) if err != nil { t.Fatal(err) } @@ -128,7 +128,7 @@ func TestTagConcurrentIncrements(t *testing.T) { wg.Wait() for _, f := range allStates { v := tg.Get(f) - if v != n { + if v != int64(n) { t.Fatalf("expected state %v to be %v, got %v", f, n, v) } } @@ -142,7 +142,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { wg.Add(10 * 5 * n) for i := 0; i < 10; i++ { s := string([]byte{uint8(i)}) - tag, err := ts.New(s, n) + tag, err := ts.New(s, int64(n)) if err != nil { t.Fatal(err) } @@ -168,7 +168,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { t.Fatal(err) } stateVal := tag.Get(f) - if stateVal != n { + if stateVal != int64(n) { t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) } } diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go index 07f9b8cd77..435f5d7068 100644 --- a/swarm/chunk/tags.go +++ b/swarm/chunk/tags.go @@ -42,12 +42,12 @@ func NewTags() *Tags { // New creates a new tag, stores it by the name and returns it // it returns an error if the tag with this name already exists -func (ts *Tags) New(s string, total int) (*Tag, error) { +func (ts *Tags) New(s string, total int64) (*Tag, error) { t := &Tag{ Uid: ts.rng.Uint32(), Name: s, startedAt: time.Now(), - total: uint32(total), + total: total, } if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { return nil, errExists @@ -55,6 +55,18 @@ func (ts *Tags) New(s string, total int) (*Tag, error) { return t, nil } +// All returns all existing tags in Tags' sync.Map +// Note that tags are returned in no particular order +func (ts *Tags) All() (t []*Tag) { + ts.tags.Range(func(k, v interface{}) bool { + t = append(t, v.(*Tag)) + + return true + }) + + return t +} + // Get returns the undelying tag for the uid or an error if not found func (ts *Tags) Get(uid uint32) (*Tag, error) { t, ok := ts.tags.Load(uid) @@ -64,8 +76,8 @@ func (ts *Tags) Get(uid uint32) (*Tag, error) { return t.(*Tag), nil } -// GetContext gets a tag from the tag uid stored in the context -func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { +// GetFromContext gets a tag from the tag uid stored in the context +func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) { uid := sctx.GetTag(ctx) t, ok := ts.tags.Load(uid) if !ok { @@ -78,3 +90,7 @@ func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { func (ts *Tags) Range(fn func(k, v interface{}) bool) { ts.tags.Range(fn) } + +func (ts *Tags) Delete(k interface{}) { + ts.tags.Delete(k) +} diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go new file mode 100644 index 0000000000..f818c4c5ca --- /dev/null +++ b/swarm/chunk/tags_test.go @@ -0,0 +1,48 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import "testing" + +func TestAll(t *testing.T) { + ts := NewTags() + + ts.New("1", 1) + ts.New("2", 1) + + all := ts.All() + + if len(all) != 2 { + t.Fatalf("expected length to be 2 got %d", len(all)) + } + + if n := all[0].Total(); n != 1 { + t.Fatalf("expected tag 0 total to be 1 got %d", n) + } + + if n := all[1].Total(); n != 1 { + t.Fatalf("expected tag 1 total to be 1 got %d", n) + } + + ts.New("3", 1) + all = ts.All() + + if len(all) != 3 { + t.Fatalf("expected length to be 3 got %d", len(all)) + } + +} diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go index 460e31c4e9..77573f0fc7 100644 --- a/swarm/fuse/swarmfs_test.go +++ b/swarm/fuse/swarmfs_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" @@ -1614,11 +1615,11 @@ func TestFUSE(t *testing.T) { } defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) if err != nil { t.Fatal(err) } - ta := &testAPI{api: api.NewAPI(fileStore, nil, nil, nil)} + ta := &testAPI{api: api.NewAPI(fileStore, nil, nil, nil, chunk.NewTags())} //run a short suite of tests //approx time: 28s diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 8e6be72b64..615b3b68f9 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -127,7 +127,7 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, return nil, nil, nil, err } - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags()) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) delivery := NewDelivery(kad, netStore) diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 5f73f7cc82..fc0f9d5dfc 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -380,7 +380,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) i++ } //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags()) //now we can actually upload a (random) file to the round-robin store size := chunkCount * chunkSize log.Debug("Storing data to file store") diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index fefdb7c9f8..da4ff673b1 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -298,7 +298,7 @@ func mapKeysToNodes(conf *synctestConfig) { //upload a file(chunks) to a single local node store func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, store chunk.Store) ([]storage.Address, error) { log.Debug(fmt.Sprintf("Uploading to node id: %s", id)) - fileStore := storage.NewFileStore(store, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(store, storage.NewFileStoreParams(), chunk.NewTags()) size := chunkSize var rootAddrs []storage.Address for i := 0; i < chunkCount; i++ { diff --git a/swarm/network_test.go b/swarm/network_test.go index 97bdd07b14..1a8c992a3a 100644 --- a/swarm/network_test.go +++ b/swarm/network_test.go @@ -23,11 +23,13 @@ import ( "io/ioutil" "math/rand" "os" + "strings" "sync" "sync/atomic" "testing" "time" + "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/testutil" "github.com/ethereum/go-ethereum/crypto" @@ -416,7 +418,7 @@ func uploadFile(swarm *Swarm) (storage.Address, string, error) { // uniqueness is very certain. data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b) ctx := context.TODO() - k, wait, err := swarm.api.Put(ctx, data, "text/plain", false) + k, wait, err := putString(ctx, swarm.api, data, "text/plain", false) if err != nil { return nil, "", err } @@ -530,3 +532,31 @@ func retrieve( return uint64(totalCheckCount) - atomic.LoadUint64(totalFoundCount) } + +// putString provides singleton manifest creation on top of api.API +func putString(ctx context.Context, a *api.API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { + r := strings.NewReader(content) + tag, err := a.Tags.New("unnamed-tag", 0) + + log.Trace("created new tag", "uid", tag.Uid) + + cCtx := sctx.SetTag(ctx, tag.Uid) + key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt) + if err != nil { + return nil, nil, err + } + manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) + r = strings.NewReader(manifest) + key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt) + if err != nil { + return nil, nil, err + } + tag.DoneSplit(key) + return key, func(ctx context.Context) error { + err := waitContent(ctx) + if err != nil { + return err + } + return waitManifest(ctx) + }, nil +} diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index 9a12594443..a0fe2e7697 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -24,6 +24,7 @@ import ( "io" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -42,8 +43,10 @@ type chunkerTester struct { t test } +var mockTag = chunk.NewTag(0, "mock-tag", 0) + func newTestHasherStore(store ChunkStore, hash string) *hasherStore { - return NewHasherStore(store, MakeHashFunc(hash), false) + return NewHasherStore(store, MakeHashFunc(hash), false, chunk.NewTag(0, "test-tag", 0)) } func testRandomBrokenData(n int, tester *chunkerTester) { @@ -91,7 +94,7 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) var err error ctx := context.TODO() if usePyramid { - addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter) + addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter, mockTag) } else { addr, wait, err = TreeSplit(ctx, data, int64(n), putGetter) } @@ -188,7 +191,7 @@ func TestDataAppend(t *testing.T) { putGetter := newTestHasherStore(store, SHA3Hash) ctx := context.TODO() - addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { tester.t.Fatalf(err.Error()) } @@ -208,7 +211,7 @@ func TestDataAppend(t *testing.T) { } putGetter = newTestHasherStore(store, SHA3Hash) - newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter) + newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter, mockTag) if err != nil { tester.t.Fatalf(err.Error()) } @@ -278,7 +281,7 @@ func benchmarkSplitJoin(n int, t *testing.B) { putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash) ctx := context.TODO() - key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -335,7 +338,7 @@ func benchmarkSplitPyramidBMT(n int, t *testing.B) { putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash) ctx := context.Background() - _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -353,7 +356,7 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) { putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash) ctx := context.Background() - _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -374,7 +377,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { putGetter := newTestHasherStore(store, SHA3Hash) ctx := context.Background() - key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -384,7 +387,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { } putGetter = newTestHasherStore(store, SHA3Hash) - _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter) + _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } diff --git a/swarm/storage/filestore.go b/swarm/storage/filestore.go index 2b15f7da68..dc096e56cb 100644 --- a/swarm/storage/filestore.go +++ b/swarm/storage/filestore.go @@ -47,6 +47,7 @@ const ( type FileStore struct { ChunkStore hashFunc SwarmHasher + tags *chunk.Tags } type FileStoreParams struct { @@ -60,19 +61,20 @@ func NewFileStoreParams() *FileStoreParams { } // for testing locally -func NewLocalFileStore(datadir string, basekey []byte) (*FileStore, error) { +func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, error) { localStore, err := localstore.New(datadir, basekey, nil) if err != nil { return nil, err } - return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams()), nil + return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), nil } -func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore { +func NewFileStore(store ChunkStore, params *FileStoreParams, tags *chunk.Tags) *FileStore { hashFunc := MakeHashFunc(params.Hash) return &FileStore{ ChunkStore: store, hashFunc: hashFunc, + tags: tags, } } @@ -83,7 +85,11 @@ func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore { // It returns a reader with the chunk data and whether the content was encrypted func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) { isEncrypted = len(addr) > f.hashFunc().Size() - getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted) + tag, err := f.tags.GetFromContext(ctx) + if err != nil { + tag = chunk.NewTag(0, "ephemeral-retrieval-tag", 0) + } + getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted, tag) reader = TreeJoin(ctx, addr, getter, 0) return } @@ -91,8 +97,17 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu // Store is a public API. Main entry point for document storage directly. Used by the // FS-aware API and httpaccess func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) { - putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt) - return PyramidSplit(ctx, data, putter, putter) + tag, err := f.tags.GetFromContext(ctx) + if err != nil { + // some of the parts of the codebase, namely the manifest trie, do not store the context + // of the original request nor the tag with the trie, recalculating the trie hence + // loses the tag uid. thus we create an ephemeral tag here for that purpose + + tag = chunk.NewTag(0, "", 0) + //return nil, nil, err + } + putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag) + return PyramidSplit(ctx, data, putter, putter, tag) } func (f *FileStore) HashSize() int { @@ -101,12 +116,14 @@ func (f *FileStore) HashSize() int { // GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) { + tag := chunk.NewTag(0, "ephemeral-tag", 0) //this tag is just a mock ephemeral tag since we don't want to save these results + // create a special kind of putter, which only will store the references putter := &hashExplorer{ - hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt), + hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag), } // do the actual splitting anyway, no way around it - _, wait, err := PyramidSplit(ctx, data, putter, putter) + _, wait, err := PyramidSplit(ctx, data, putter, putter, tag) if err != nil { return nil, err } diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go index fe01eed9aa..d0a167a244 100644 --- a/swarm/storage/filestore_test.go +++ b/swarm/storage/filestore_test.go @@ -25,6 +25,7 @@ import ( "path/filepath" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage/localstore" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -48,7 +49,7 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { } defer localStore.Close() - fileStore := NewFileStore(localStore, NewFileStoreParams()) + fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags()) slice := testutil.RandomBytes(1, testDataSize) ctx := context.TODO() @@ -113,7 +114,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { } defer localStore.Close() - fileStore := NewFileStore(localStore, NewFileStoreParams()) + fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags()) slice := testutil.RandomBytes(1, testDataSize) ctx := context.TODO() key, wait, err := fileStore.Store(ctx, bytes.NewReader(slice), testDataSize, toEncrypt) @@ -182,7 +183,7 @@ func TestGetAllReferences(t *testing.T) { } defer localStore.Close() - fileStore := NewFileStore(localStore, NewFileStoreParams()) + fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags()) // testRuns[i] and expectedLen[i] are dataSize and expected length respectively testRuns := []int{1024, 8192, 16000, 30000, 1000000} diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index 2e4a1c11b0..1e702f11ae 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -28,6 +28,7 @@ import ( type hasherStore struct { store ChunkStore + tag *chunk.Tag toEncrypt bool hashFunc SwarmHasher hashSize int // content hash size @@ -44,7 +45,7 @@ type hasherStore struct { // NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. // With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore // and the hasherStore will take core of encryption/decryption of data if necessary -func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { +func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool, tag *chunk.Tag) *hasherStore { hashSize := hashFunc().Size() refSize := int64(hashSize) if toEncrypt { @@ -53,6 +54,7 @@ func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *has h := &hasherStore{ store: store, + tag: tag, toEncrypt: toEncrypt, hashFunc: hashFunc, hashSize: hashSize, @@ -242,7 +244,11 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) { atomic.AddUint64(&h.nrChunks, 1) go func() { - _, err := h.store.Put(ctx, chunk.ModePutUpload, ch) + seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch) + h.tag.Inc(chunk.StateStored) + if seen { + h.tag.Inc(chunk.StateSeen) + } select { case h.errC <- err: case <-h.quitC: diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go index c95537db73..9dfd7ab1d9 100644 --- a/swarm/storage/hasherstore_test.go +++ b/swarm/storage/hasherstore_test.go @@ -43,7 +43,7 @@ func TestHasherStore(t *testing.T) { for _, tt := range tests { chunkStore := NewMapChunkStore() - hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt) + hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 0)) // Put two random chunks into the hasherStore chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data() diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 281bbe9fe3..9b0d5397b8 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -96,12 +96,12 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes. New chunks to store are store using the putter which the caller provides. */ -func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx) +func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) { + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize), tag).Split(ctx) } -func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx) +func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) { + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize), tag).Append(ctx) } // Entry to create a tree node @@ -142,6 +142,7 @@ type PyramidChunker struct { putter Putter getter Getter key Address + tag *chunk.Tag workerCount int64 workerLock sync.RWMutex jobC chan *chunkJob @@ -152,7 +153,7 @@ type PyramidChunker struct { chunkLevel [][]*TreeEntry } -func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { +func NewPyramidSplitter(params *PyramidSplitterParams, tag *chunk.Tag) (pc *PyramidChunker) { pc = &PyramidChunker{} pc.reader = params.reader pc.hashSize = params.hashSize @@ -161,6 +162,7 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { pc.putter = params.putter pc.getter = params.getter pc.key = params.addr + pc.tag = tag pc.workerCount = 0 pc.jobC = make(chan *chunkJob, 2*ChunkProcessors) pc.wg = &sync.WaitGroup{} @@ -273,6 +275,7 @@ func (pc *PyramidChunker) processor(ctx context.Context, id int64) { return } pc.processChunk(ctx, id, job) + pc.tag.Inc(chunk.StateSplit) case <-pc.quitC: return } diff --git a/swarm/swarm.go b/swarm/swarm.go index 2f025d9cc9..d004bcd2f4 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -211,9 +211,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e MaxPeerServers: config.MaxStreamPeerServers, } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap) + tags := chunk.NewTags() //todo load from state store // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams) + self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams, tags) log.Debug("Setup local storage") @@ -228,7 +229,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e pss.SetHandshakeController(self.ps, pss.NewHandshakeParams()) } - self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey) + self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, tags) self.sfs = fuse.NewSwarmFS(self.api) log.Debug("Initialized FUSE filesystem") diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index 2a5b28513a..cf2afaec9e 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/sctx" ) // TestNewSwarm validates Swarm fields in repsect to the provided configuration. @@ -352,8 +353,11 @@ func testLocalStoreAndRetrieve(t *testing.T, swarm *Swarm, n int, randomData boo rand.Read(slice) } dataPut := string(slice) - - ctx := context.TODO() + tag, err := swarm.api.Tags.New("test-local-store-and-retrieve", 0) + if err != nil { + t.Fatal(err) + } + ctx := sctx.SetTag(context.Background(), tag.Uid) k, wait, err := swarm.api.Store(ctx, strings.NewReader(dataPut), int64(len(dataPut)), false) if err != nil { t.Fatal(err) diff --git a/swarm/testutil/tag.go b/swarm/testutil/tag.go new file mode 100644 index 0000000000..d9908f11bd --- /dev/null +++ b/swarm/testutil/tag.go @@ -0,0 +1,51 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package testutil + +import ( + "testing" + + "github.com/ethereum/go-ethereum/swarm/chunk" +) + +// CheckTag checks the first tag in the api struct to be in a certain state +func CheckTag(t *testing.T, tag *chunk.Tag, split, stored, seen, total int64) { + t.Helper() + if tag == nil { + t.Fatal("no tag found") + } + + tSplit := tag.Get(chunk.StateSplit) + if tSplit != split { + t.Fatalf("should have had split chunks, got %d want %d", tSplit, split) + } + + tSeen := tag.Get(chunk.StateSeen) + if tSeen != seen { + t.Fatalf("should have had seen chunks, got %d want %d", tSeen, seen) + } + + tStored := tag.Get(chunk.StateStored) + if tStored != stored { + t.Fatalf("mismatch stored chunks, got %d want %d", tStored, stored) + } + + tTotal := tag.Total() + if tTotal != total { + t.Fatalf("mismatch total chunks, got %d want %d", tTotal, total) + } +}