diff options
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/stream/common_test.go | 4 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 10 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 11 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 3 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 181 |
6 files changed, 198 insertions, 19 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index e0d776e34..24de51158 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -84,7 +84,7 @@ func createGlobalStore() (string, *mockdb.GlobalStore, error) { return globalStoreDir, globalStore, nil } -func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { +func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address to := network.NewKademlia(addr.OAddr, network.NewKadParams()) @@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora delivery := NewDelivery(to, netStore) netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) + streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), registryOptions) teardown := func() { streamer.Close() removeDataDir() diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index ece54d4ee..016a8d15e 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -39,7 +39,7 @@ import ( ) func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -75,7 +75,7 @@ func TestStreamerRetrieveRequest(t *testing.T) { } func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -127,7 +127,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t) + tester, streamer, localStore, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -221,7 +221,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t) + tester, streamer, localStore, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 2e1a81e82..482af25f5 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -84,11 +84,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e defer func() { if err != nil { - if e := p.Send(context.TODO(), SubscribeErrorMsg{ + // The error will be sent as a subscribe error message + // and will not be returned as it will prevent any new message + // exchange between peers over p2p. Instead, error will be returned + // only if there is one from sending subscribe error message. + err = p.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), - }); e != nil { - log.Error("send stream subscribe error message", "err", err) - } + }) } }() diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 1466a7a9c..aeaf7bbfa 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -18,6 +18,7 @@ package stream import ( "context" + "errors" "fmt" "sync" "time" @@ -46,6 +47,10 @@ func (e *notFoundError) Error() string { return fmt.Sprintf("%s not found for stream %q", e.t, e.s) } +// ErrMaxPeerServers will be returned if peer server limit is reached. +// It will be sent in the SubscribeErrorMsg. +var ErrMaxPeerServers = errors.New("max peer servers") + // Peer is the Peer extension for the streaming protocol type Peer struct { *protocols.Peer @@ -204,6 +209,11 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) { if p.servers[s] != nil { return nil, fmt.Errorf("server %s already registered", s) } + + if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers { + return nil, ErrMaxPeerServers + } + os := &server{ Server: o, stream: s, @@ -346,6 +356,7 @@ func (p *Peer) removeClient(s Stream) error { return newNotFoundError("client", s) } client.close() + delete(p.clients, s) return nil } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 319fc62c9..6dcf31165 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -60,6 +60,7 @@ type Registry struct { delivery *Delivery intervalsStore state.Store doRetrieve bool + maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. @@ -68,6 +69,7 @@ type RegistryOptions struct { DoSync bool DoRetrieve bool SyncUpdateDelay time.Duration + MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor @@ -87,6 +89,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore stora delivery: delivery, intervalsStore: intervalsStore, doRetrieve: options.DoRetrieve, + maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 06e96b9a9..4f480b0ac 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -19,6 +19,7 @@ package stream import ( "bytes" "context" + "strconv" "testing" "time" @@ -27,7 +28,7 @@ import ( ) func TestStreamerSubscribe(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -41,7 +42,7 @@ func TestStreamerSubscribe(t *testing.T) { } func TestStreamerRequestSubscription(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -125,7 +126,7 @@ func (self *testServer) Close() { } func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -218,7 +219,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -285,7 +286,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -351,7 +352,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { } func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -395,7 +396,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -460,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -559,7 +560,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -685,3 +686,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { t.Fatal(err) } } + +// TestMaxPeerServersWithUnsubscribe creates a registry with a limited +// number of stream servers, and performs a test with subscriptions and +// unsubscriptions, checking if unsubscriptions will remove streams, +// leaving place for new streams. +func TestMaxPeerServersWithUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + peerID := tester.IDs[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "unsubscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: &UnsubscribeMsg{ + Stream: stream, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} + +// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited +// number of stream servers, and performs subscriptions to detect sunscriptions +// error message exchange. +func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + peerID := tester.IDs[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + if i >= maxPeerServers { + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 7, + Msg: &SubscribeErrorMsg{ + Error: ErrMaxPeerServers.Error(), + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + continue + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: peerID, + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} |