diff options
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r-- | swarm/network/stream/streamer_test.go | 151 |
1 files changed, 109 insertions, 42 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 06e96b9a9..ba4328eef 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -19,6 +19,7 @@ package stream import ( "bytes" "context" + "errors" "testing" "time" @@ -34,7 +35,7 @@ func TestStreamerSubscribe(t *testing.T) { } stream := NewStream("foo", "", true) - err = streamer.Subscribe(tester.IDs[0], stream, NewRange(0, 0), Top) + err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top) if err == nil || err.Error() != "stream foo not registered" { t.Fatalf("Expected error %v, got %v", "stream foo not registered", err) } @@ -48,18 +49,19 @@ func TestStreamerRequestSubscription(t *testing.T) { } stream := NewStream("foo", "", false) - err = streamer.RequestSubscription(tester.IDs[0], stream, &Range{}, Top) + err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top) if err == nil || err.Error() != "stream foo not registered" { t.Fatalf("Expected error %v, got %v", "stream foo not registered", err) } } var ( - hash0 = sha3.Sum256([]byte{0}) - hash1 = sha3.Sum256([]byte{1}) - hash2 = sha3.Sum256([]byte{2}) - hashesTmp = append(hash0[:], hash1[:]...) - hashes = append(hashesTmp, hash2[:]...) + hash0 = sha3.Sum256([]byte{0}) + hash1 = sha3.Sum256([]byte{1}) + hash2 = sha3.Sum256([]byte{2}) + hashesTmp = append(hash0[:], hash1[:]...) + hashes = append(hashesTmp, hash2[:]...) + corruptHashes = append(hashes[:40]) ) type testClient struct { @@ -135,10 +137,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { return newTestClient(t), nil }) - peerID := tester.IDs[0] + node := tester.Nodes[0] stream := NewStream("foo", "", true) - err = streamer.Subscribe(peerID, stream, NewRange(5, 8), Top) + err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -154,7 +156,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { History: NewRange(5, 8), Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, }, @@ -173,7 +175,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { To: 8, Stream: stream, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -185,7 +187,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { From: 9, To: 0, }, - Peer: peerID, + Peer: node.ID(), }, }, }, @@ -194,7 +196,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { t.Fatal(err) } - err = streamer.Unsubscribe(peerID, stream) + err = streamer.Unsubscribe(node.ID(), stream) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -207,7 +209,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { Msg: &UnsubscribeMsg{ Stream: stream, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -230,7 +232,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { return newTestServer(t), nil }) - peerID := tester.IDs[0] + node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -242,7 +244,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { History: NewRange(5, 8), Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -257,7 +259,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { From: 6, To: 9, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -274,7 +276,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { Msg: &UnsubscribeMsg{ Stream: stream, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -297,7 +299,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { return newTestServer(t), nil }) - peerID := tester.IDs[0] + node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -308,7 +310,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { Stream: stream, Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -323,7 +325,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { From: 1, To: 1, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -340,7 +342,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { Msg: &UnsubscribeMsg{ Stream: stream, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -363,7 +365,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { stream := NewStream("bar", "", true) - peerID := tester.IDs[0] + node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -375,7 +377,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { History: NewRange(5, 8), Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -384,7 +386,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { Msg: &SubscribeErrorMsg{ Error: "stream bar not registered", }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -409,7 +411,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { }, nil }) - peerID := tester.IDs[0] + node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -421,7 +423,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { History: NewRange(5, 8), Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -436,7 +438,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { From: 6, To: 9, }, - Peer: peerID, + Peer: node.ID(), }, { Code: 1, @@ -449,7 +451,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { To: 1, Hashes: make([]byte, HashSize), }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -459,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } } -func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { +func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t) defer teardown() if err != nil { @@ -497,6 +499,71 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { }, }, p2ptest.Exchange{ + Label: "Corrupt offered hash message", + Triggers: []p2ptest.Trigger{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: corruptHashes, + From: 5, + To: 8, + Stream: stream, + }, + Peer: peerID, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") + if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.IDs[0], Error: expectedError}); err != nil { + t.Fatal(err) + } +} + +func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { + tester, streamer, _, teardown, err := newStreamerTester(t) + defer teardown() + if err != nil { + t.Fatal(err) + } + + stream := NewStream("foo", "", true) + + var tc *testClient + + streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { + tc = newTestClient(t) + return tc, nil + }) + + node := tester.Nodes[0] + + err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Expects: []p2ptest.Expect{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: NewRange(5, 8), + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, + p2ptest.Exchange{ Label: "WantedHashes message", Triggers: []p2ptest.Trigger{ { @@ -510,7 +577,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { To: 8, Stream: stream, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -522,7 +589,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { From: 9, To: 0, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -569,10 +636,10 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { return newTestServer(t), nil }) - peerID := tester.IDs[0] + node := tester.Nodes[0] stream := NewStream("foo", "", true) - err = streamer.RequestSubscription(peerID, stream, NewRange(5, 8), Top) + err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -588,7 +655,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { History: NewRange(5, 8), Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, }, @@ -602,7 +669,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { History: NewRange(5, 8), Priority: Top, }, - Peer: peerID, + Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ @@ -617,7 +684,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { From: 6, To: 9, }, - Peer: peerID, + Peer: node.ID(), }, { Code: 1, @@ -630,7 +697,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { To: 1, Hashes: make([]byte, HashSize), }, - Peer: peerID, + Peer: node.ID(), }, }, }, @@ -639,7 +706,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { t.Fatal(err) } - err = streamer.Quit(peerID, stream) + err = streamer.Quit(node.ID(), stream) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -652,7 +719,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { Msg: &QuitMsg{ Stream: stream, }, - Peer: peerID, + Peer: node.ID(), }, }, }) @@ -663,7 +730,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { historyStream := getHistoryStream(stream) - err = streamer.Quit(peerID, historyStream) + err = streamer.Quit(node.ID(), historyStream) if err != nil { t.Fatalf("Expected no error, got %v", err) } @@ -676,7 +743,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { Msg: &QuitMsg{ Stream: historyStream, }, - Peer: peerID, + Peer: node.ID(), }, }, }) |