diff options
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r-- | swarm/network/stream/delivery_test.go | 75 |
1 files changed, 20 insertions, 55 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index cb7690f3e..e5821df4f 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "testing" "time" @@ -48,10 +47,10 @@ func TestStreamerRetrieveRequest(t *testing.T) { Syncing: SyncingDisabled, } tester, streamer, _, teardown, err := newStreamerTester(regOpts) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -100,10 +99,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { Retrieval: RetrievalEnabled, Syncing: SyncingDisabled, //do no syncing }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -172,10 +171,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { Retrieval: RetrievalEnabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() node := tester.Nodes[0] @@ -362,10 +361,10 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, }) - defer teardown() if err != nil { t.Fatal(err) } + defer teardown() streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { return &testClient{ @@ -485,7 +484,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) } log.Info("Starting simulation") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { nodeIDs := sim.UpNodeIDs() //determine the pivot node to be the first node of the simulation @@ -548,27 +548,10 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) retErrC <- err }() - log.Debug("Watching for disconnections") - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() + disconnected := watchDisconnections(ctx, sim) defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") } }() @@ -589,7 +572,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) return fmt.Errorf("Test failed, chunks not available on all nodes") } if err := <-retErrC; err != nil { - t.Fatalf("requesting chunks: %v", err) + return fmt.Errorf("requesting chunks: %v", err) } log.Debug("Test terminated successfully") return nil @@ -657,21 +640,22 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b b.Fatal(err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { nodeIDs := sim.UpNodeIDs() node := nodeIDs[len(nodeIDs)-1] item, ok := sim.NodeItem(node, bucketKeyFileStore) if !ok { - b.Fatal("No filestore") + return errors.New("No filestore") } remoteFileStore := item.(*storage.FileStore) pivotNode := nodeIDs[0] item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) if !ok { - b.Fatal("No filestore") + return errors.New("No filestore") } netStore := item.(*storage.NetStore) @@ -679,26 +663,10 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b return err } - disconnections := sim.PeerEvents( - context.Background(), - sim.NodeIDs(), - simulation.NewPeerEventsFilter().Drop(), - ) - - var disconnected atomic.Value - go func() { - for d := range disconnections { - if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) - disconnected.Store(true) - } - } - }() + disconnected := watchDisconnections(ctx, sim) defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") } }() // benchmark loop @@ -713,12 +681,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b ctx := context.TODO() hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false) if err != nil { - b.Fatalf("expected no error. got %v", err) + return fmt.Errorf("store: %v", err) } // wait until all chunks stored err = wait(ctx) if err != nil { - b.Fatalf("expected no error. got %v", err) + return fmt.Errorf("wait store: %v", err) } // collect the hashes hashes[i] = hash @@ -754,10 +722,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b break Loop } } - if err != nil { - b.Fatal(err) - } - return nil + return err }) if result.Error != nil { b.Fatal(result.Error) |