Enable longrunning tests to run (#19208)
* p2p/simulations: increased snapshot load timeout for debugging * swarm/network/stream: less nodes for snapshot longrunning tests * swarm/network: fixed longrunning tests * swarm/network/stream: store kademlia in bucket * swarm/network/stream: disabled healthy check in delivery tests * swarm/network/stream: longer SyncUpdateDelay for longrunning tests * swarm/network/stream: more debug output * swarm/network/stream: reduced longrunning snapshot tests to 64 nodes * swarm/network/stream: don't WaitTillHealthy in SyncerSimulation * swarm/network/stream: cleanup for PR
This commit is contained in:
parent
216bd2ceba
commit
81ed700157
|
@ -840,7 +840,8 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
|
||||||
return snap, nil
|
return snap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var snapshotLoadTimeout = 120 * time.Second
|
// longrunning tests may need a longer timeout
|
||||||
|
var snapshotLoadTimeout = 900 * time.Second
|
||||||
|
|
||||||
// Load loads a network snapshot
|
// Load loads a network snapshot
|
||||||
func (net *Network) Load(snap *Snapshot) error {
|
func (net *Network) Load(snap *Snapshot) error {
|
||||||
|
|
|
@ -134,6 +134,9 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map,
|
||||||
bucket.Store(bucketKeyDB, netStore)
|
bucket.Store(bucketKeyDB, netStore)
|
||||||
bucket.Store(bucketKeyDelivery, delivery)
|
bucket.Store(bucketKeyDelivery, delivery)
|
||||||
bucket.Store(bucketKeyFileStore, fileStore)
|
bucket.Store(bucketKeyFileStore, fileStore)
|
||||||
|
// for the kademlia object, we use the global key from the simulation package,
|
||||||
|
// as the simulation will try to access it in the WaitTillHealthy with that key
|
||||||
|
bucket.Store(simulation.BucketKeyKademlia, kad)
|
||||||
|
|
||||||
cleanup := func() {
|
cleanup := func() {
|
||||||
netStore.Close()
|
netStore.Close()
|
||||||
|
|
|
@ -534,12 +534,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Waiting for kademlia")
|
|
||||||
// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
|
|
||||||
if _, err := sim.WaitTillHealthy(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
//get the pivot node's filestore
|
//get the pivot node's filestore
|
||||||
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
|
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -53,7 +53,7 @@ func TestFileRetrieval(t *testing.T) {
|
||||||
nodeCount = []int{16}
|
nodeCount = []int{16}
|
||||||
|
|
||||||
if *longrunning {
|
if *longrunning {
|
||||||
nodeCount = append(nodeCount, 32, 64, 128)
|
nodeCount = append(nodeCount, 32, 64)
|
||||||
} else if testutil.RaceEnabled {
|
} else if testutil.RaceEnabled {
|
||||||
nodeCount = []int{4}
|
nodeCount = []int{4}
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func TestRetrieval(t *testing.T) {
|
||||||
chnkCnt := []int{32}
|
chnkCnt := []int{32}
|
||||||
|
|
||||||
if *longrunning {
|
if *longrunning {
|
||||||
nodeCnt = []int{16, 32, 128}
|
nodeCnt = []int{16, 32, 64}
|
||||||
chnkCnt = []int{4, 32, 256}
|
chnkCnt = []int{4, 32, 256}
|
||||||
} else if testutil.RaceEnabled {
|
} else if testutil.RaceEnabled {
|
||||||
nodeCnt = []int{4}
|
nodeCnt = []int{4}
|
||||||
|
@ -113,10 +113,15 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncUpdateDelay := 1 * time.Second
|
||||||
|
if *longrunning {
|
||||||
|
syncUpdateDelay = 3 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
|
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
|
||||||
Retrieval: RetrievalEnabled,
|
Retrieval: RetrievalEnabled,
|
||||||
Syncing: SyncingAutoSubscribe,
|
Syncing: SyncingAutoSubscribe,
|
||||||
SyncUpdateDelay: 3 * time.Second,
|
SyncUpdateDelay: syncUpdateDelay,
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
cleanup = func() {
|
cleanup = func() {
|
||||||
|
@ -140,7 +145,7 @@ func runFileRetrievalTest(nodeCount int) error {
|
||||||
sim := simulation.New(retrievalSimServiceMap)
|
sim := simulation.New(retrievalSimServiceMap)
|
||||||
defer sim.Close()
|
defer sim.Close()
|
||||||
|
|
||||||
log.Info("Initializing test config")
|
log.Info("Initializing test config", "node count", nodeCount)
|
||||||
|
|
||||||
conf := &synctestConfig{}
|
conf := &synctestConfig{}
|
||||||
//map of discover ID to indexes of chunks expected at that ID
|
//map of discover ID to indexes of chunks expected at that ID
|
||||||
|
@ -158,6 +163,8 @@ func runFileRetrievalTest(nodeCount int) error {
|
||||||
ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
|
ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||||
defer cancelSimRun()
|
defer cancelSimRun()
|
||||||
|
|
||||||
|
log.Info("Starting simulation")
|
||||||
|
|
||||||
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
|
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
|
||||||
nodeIDs := sim.UpNodeIDs()
|
nodeIDs := sim.UpNodeIDs()
|
||||||
for _, n := range nodeIDs {
|
for _, n := range nodeIDs {
|
||||||
|
@ -185,6 +192,8 @@ func runFileRetrievalTest(nodeCount int) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("network healthy, start file checks")
|
||||||
|
|
||||||
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
|
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
|
||||||
// or until the timeout is reached.
|
// or until the timeout is reached.
|
||||||
REPEAT:
|
REPEAT:
|
||||||
|
@ -212,6 +221,8 @@ func runFileRetrievalTest(nodeCount int) error {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
log.Info("Simulation terminated")
|
||||||
|
|
||||||
if result.Error != nil {
|
if result.Error != nil {
|
||||||
return result.Error
|
return result.Error
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,8 +94,8 @@ func TestSyncingViaGlobalSync(t *testing.T) {
|
||||||
//if the `longrunning` flag has been provided
|
//if the `longrunning` flag has been provided
|
||||||
//run more test combinations
|
//run more test combinations
|
||||||
if *longrunning {
|
if *longrunning {
|
||||||
chunkCounts = []int{1, 8, 32, 256, 1024}
|
chunkCounts = []int{64, 128}
|
||||||
nodeCounts = []int{16, 32, 64, 128, 256}
|
nodeCounts = []int{32, 64}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chunkCount := range chunkCounts {
|
for _, chunkCount := range chunkCounts {
|
||||||
|
|
|
@ -1188,12 +1188,13 @@ func TestGetSubscriptionsRPC(t *testing.T) {
|
||||||
|
|
||||||
// arbitrarily set to 4
|
// arbitrarily set to 4
|
||||||
nodeCount := 4
|
nodeCount := 4
|
||||||
|
// set the syncUpdateDelay for sync registrations to start
|
||||||
|
syncUpdateDelay := 200 * time.Millisecond
|
||||||
// run with more nodes if `longrunning` flag is set
|
// run with more nodes if `longrunning` flag is set
|
||||||
if *longrunning {
|
if *longrunning {
|
||||||
nodeCount = 64
|
nodeCount = 64
|
||||||
|
syncUpdateDelay = 10 * time.Second
|
||||||
}
|
}
|
||||||
// set the syncUpdateDelay for sync registrations to start
|
|
||||||
syncUpdateDelay := 200 * time.Millisecond
|
|
||||||
// holds the msg code for SubscribeMsg
|
// holds the msg code for SubscribeMsg
|
||||||
var subscribeMsgCode uint64
|
var subscribeMsgCode uint64
|
||||||
var ok bool
|
var ok bool
|
||||||
|
@ -1241,7 +1242,7 @@ func TestGetSubscriptionsRPC(t *testing.T) {
|
||||||
})
|
})
|
||||||
defer sim.Close()
|
defer sim.Close()
|
||||||
|
|
||||||
ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
|
ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||||
defer cancelSimRun()
|
defer cancelSimRun()
|
||||||
|
|
||||||
// upload a snapshot
|
// upload a snapshot
|
||||||
|
@ -1267,6 +1268,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
//for long running sims, waiting 1 sec will not be enough
|
//for long running sims, waiting 1 sec will not be enough
|
||||||
waitDuration := time.Duration(nodeCount/16) * time.Second
|
waitDuration := time.Duration(nodeCount/16) * time.Second
|
||||||
|
if *longrunning {
|
||||||
|
waitDuration = syncUpdateDelay
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -1328,11 +1332,11 @@ func TestGetSubscriptionsRPC(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.Debug("All node streams counted", "realCount", realCount)
|
||||||
}
|
}
|
||||||
// every node is mutually subscribed to each other, so the actual count is half of it
|
|
||||||
emc := expectedMsgCount.count()
|
emc := expectedMsgCount.count()
|
||||||
if realCount/2 != emc {
|
if realCount != emc {
|
||||||
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
|
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount, emc)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -173,10 +173,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// here we distribute chunks of a random file into stores 1...nodes
|
// here we distribute chunks of a random file into stores 1...nodes
|
||||||
if _, err := sim.WaitTillHealthy(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// collect hashes in po 1 bin for each node
|
// collect hashes in po 1 bin for each node
|
||||||
hashes := make([][]storage.Address, nodes)
|
hashes := make([][]storage.Address, nodes)
|
||||||
totalHashes := 0
|
totalHashes := 0
|
||||||
|
|
Loading…
Reference in New Issue