diff options
Diffstat (limited to 'swarm/network/stream/common_test.go')
-rw-r--r-- | swarm/network/stream/common_test.go | 62 |
1 files changed, 58 insertions, 4 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 8a7d851fb..afd08d275 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste // temp datadir datadir, err := ioutil.TempDir("", "streamer") if err != nil { - return nil, nil, nil, func() {}, err + return nil, nil, nil, nil, err } removeDataDir := func() { os.RemoveAll(datadir) @@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste localStore, err := storage.NewTestLocalStoreForAddr(params) if err != nil { - return nil, nil, nil, removeDataDir, err + removeDataDir() + return nil, nil, nil, nil, err } netStore, err := storage.NewNetStore(localStore, nil) if err != nil { - return nil, nil, nil, removeDataDir, err + removeDataDir() + return nil, nil, nil, nil, err } delivery := NewDelivery(to, netStore) @@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste } protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol) - err = waitForPeers(streamer, 1*time.Second, 1) + err = waitForPeers(streamer, 10*time.Second, 1) if err != nil { + teardown() return nil, nil, nil, nil, errors.New("timeout: peer is not created") } @@ -317,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch } return store, datadir, nil } + +// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value +// disconnected to true in case of a disconnect event. +func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) { + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + ctx, + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Drop(), + ) + disconnected = new(boolean) + go func() { + for { + select { + case <-ctx.Done(): + return + case d := <-disconnections: + if d.Error != nil { + log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error) + } else { + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) + } + disconnected.set(true) + } + } + }() + return disconnected +} + +// boolean is used to concurrently set +// and read a boolean value. +type boolean struct { + v bool + mu sync.RWMutex +} + +// set sets the value. +func (b *boolean) set(v bool) { + b.mu.Lock() + defer b.mu.Unlock() + + b.v = v +} + +// bool reads the value. +func (b *boolean) bool() bool { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.v +} |