diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index e9380813a3..43d2c1ff51 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -37,15 +37,16 @@ var ( ) var ( - allhosts string - hosts []string - filesize int - syncDelay int - httpPort int - wsPort int - verbosity int - timeout int - single bool + allhosts string + hosts []string + filesize int + syncDelay int + httpPort int + wsPort int + verbosity int + timeout int + single bool + trackTimeout int ) func main() { @@ -102,6 +103,12 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, + cli.IntFlag{ + Name: "track-timeout", + Value: 5, + Usage: "timeout in seconds to wait for GetAllReferences to return", + Destination: &trackTimeout, + }, } app.Flags = append(app.Flags, []cli.Flag{ diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index b2858e2275..90230df253 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -18,13 +18,19 @@ package main import ( "bytes" + "context" "fmt" + "io/ioutil" "math/rand" + "os" "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" "github.com/pborman/uuid" @@ -49,12 +55,75 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { case <-time.After(time.Duration(timeout) * time.Second): metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) + e := fmt.Errorf("timeout after %v sec", timeout) // trigger debug functionality on randomBytes + err := trackChunks(randomBytes[:]) + if err != nil { + e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err) + } - return fmt.Errorf("timeout after %v sec", timeout) + return e } } +func trackChunks(testData []byte) error { + log.Warn("Test timed out; running chunk debug sequence") + + addrs, err := getAllRefs(testData) + if err != nil { + return err + } + log.Trace("All references retrieved") + + // has-chunks + for _, host := range hosts { + httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + log.Trace("Calling `Has` on host", "httpHost", httpHost) + rpcClient, err := rpc.Dial(httpHost) + if err != nil { + log.Trace("Error dialing host", "err", err) + return err + } + log.Trace("rpc dial ok") + var hasInfo []api.HasInfo + err = rpcClient.Call(&hasInfo, "bzz_has", addrs) + if err != nil { + log.Trace("Error calling host", "err", err) + return err + } + log.Trace("rpc call ok") + count := 0 + for _, info := range hasInfo { + if !info.Has { + count++ + log.Error("Host does not have chunk", "host", httpHost, "chunk", info.Addr) + } + } + if count == 0 { + log.Info("Host reported to have all chunks", "host", httpHost) + } + } + return nil +} + +func getAllRefs(testData []byte) (storage.AddressCollection, error) { + log.Trace("Getting all references for given root hash") + datadir, err := ioutil.TempDir("", "chunk-debug") + if err != nil { + return nil, fmt.Errorf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second) + defer cancel() + + reader := bytes.NewReader(testData) + return fileStore.GetAllReferences(ctx, reader, false) +} + func uplaodAndSync(c *cli.Context, randomBytes []byte, tuid string) error { log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed)