aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/streamer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/streamer_test.go')
-rw-r--r--swarm/network/stream/streamer_test.go151
1 files changed, 109 insertions, 42 deletions
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 06e96b9a9..ba4328eef 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -19,6 +19,7 @@ package stream
import (
"bytes"
"context"
+ "errors"
"testing"
"time"
@@ -34,7 +35,7 @@ func TestStreamerSubscribe(t *testing.T) {
}
stream := NewStream("foo", "", true)
- err = streamer.Subscribe(tester.IDs[0], stream, NewRange(0, 0), Top)
+ err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
if err == nil || err.Error() != "stream foo not registered" {
t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
}
@@ -48,18 +49,19 @@ func TestStreamerRequestSubscription(t *testing.T) {
}
stream := NewStream("foo", "", false)
- err = streamer.RequestSubscription(tester.IDs[0], stream, &Range{}, Top)
+ err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
if err == nil || err.Error() != "stream foo not registered" {
t.Fatalf("Expected error %v, got %v", "stream foo not registered", err)
}
}
var (
- hash0 = sha3.Sum256([]byte{0})
- hash1 = sha3.Sum256([]byte{1})
- hash2 = sha3.Sum256([]byte{2})
- hashesTmp = append(hash0[:], hash1[:]...)
- hashes = append(hashesTmp, hash2[:]...)
+ hash0 = sha3.Sum256([]byte{0})
+ hash1 = sha3.Sum256([]byte{1})
+ hash2 = sha3.Sum256([]byte{2})
+ hashesTmp = append(hash0[:], hash1[:]...)
+ hashes = append(hashesTmp, hash2[:]...)
+ corruptHashes = append(hashes[:40])
)
type testClient struct {
@@ -135,10 +137,10 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
return newTestClient(t), nil
})
- peerID := tester.IDs[0]
+ node := tester.Nodes[0]
stream := NewStream("foo", "", true)
- err = streamer.Subscribe(peerID, stream, NewRange(5, 8), Top)
+ err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -154,7 +156,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
History: NewRange(5, 8),
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
},
@@ -173,7 +175,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
To: 8,
Stream: stream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -185,7 +187,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
From: 9,
To: 0,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
},
@@ -194,7 +196,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
t.Fatal(err)
}
- err = streamer.Unsubscribe(peerID, stream)
+ err = streamer.Unsubscribe(node.ID(), stream)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -207,7 +209,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Msg: &UnsubscribeMsg{
Stream: stream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -230,7 +232,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
return newTestServer(t), nil
})
- peerID := tester.IDs[0]
+ node := tester.Nodes[0]
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -242,7 +244,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
History: NewRange(5, 8),
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -257,7 +259,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
From: 6,
To: 9,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -274,7 +276,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Msg: &UnsubscribeMsg{
Stream: stream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -297,7 +299,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
return newTestServer(t), nil
})
- peerID := tester.IDs[0]
+ node := tester.Nodes[0]
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -308,7 +310,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
Stream: stream,
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -323,7 +325,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
From: 1,
To: 1,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -340,7 +342,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
Msg: &UnsubscribeMsg{
Stream: stream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -363,7 +365,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
stream := NewStream("bar", "", true)
- peerID := tester.IDs[0]
+ node := tester.Nodes[0]
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -375,7 +377,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
History: NewRange(5, 8),
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -384,7 +386,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
Msg: &SubscribeErrorMsg{
Error: "stream bar not registered",
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -409,7 +411,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}, nil
})
- peerID := tester.IDs[0]
+ node := tester.Nodes[0]
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -421,7 +423,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
History: NewRange(5, 8),
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -436,7 +438,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
From: 6,
To: 9,
},
- Peer: peerID,
+ Peer: node.ID(),
},
{
Code: 1,
@@ -449,7 +451,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
To: 1,
Hashes: make([]byte, HashSize),
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -459,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
}
-func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
+func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
defer teardown()
if err != nil {
@@ -497,6 +499,71 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
},
},
p2ptest.Exchange{
+ Label: "Corrupt offered hash message",
+ Triggers: []p2ptest.Trigger{
+ {
+ Code: 1,
+ Msg: &OfferedHashesMsg{
+ HandoverProof: &HandoverProof{
+ Handover: &Handover{},
+ },
+ Hashes: corruptHashes,
+ From: 5,
+ To: 8,
+ Stream: stream,
+ },
+ Peer: peerID,
+ },
+ },
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
+ if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.IDs[0], Error: expectedError}); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
+ tester, streamer, _, teardown, err := newStreamerTester(t)
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ stream := NewStream("foo", "", true)
+
+ var tc *testClient
+
+ streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
+ tc = newTestClient(t)
+ return tc, nil
+ })
+
+ node := tester.Nodes[0]
+
+ err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ err = tester.TestExchanges(p2ptest.Exchange{
+ Label: "Subscribe message",
+ Expects: []p2ptest.Expect{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: NewRange(5, 8),
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ },
+ p2ptest.Exchange{
Label: "WantedHashes message",
Triggers: []p2ptest.Trigger{
{
@@ -510,7 +577,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
To: 8,
Stream: stream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -522,7 +589,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
From: 9,
To: 0,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -569,10 +636,10 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
return newTestServer(t), nil
})
- peerID := tester.IDs[0]
+ node := tester.Nodes[0]
stream := NewStream("foo", "", true)
- err = streamer.RequestSubscription(peerID, stream, NewRange(5, 8), Top)
+ err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -588,7 +655,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
History: NewRange(5, 8),
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
},
@@ -602,7 +669,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
History: NewRange(5, 8),
Priority: Top,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
@@ -617,7 +684,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
From: 6,
To: 9,
},
- Peer: peerID,
+ Peer: node.ID(),
},
{
Code: 1,
@@ -630,7 +697,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
To: 1,
Hashes: make([]byte, HashSize),
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
},
@@ -639,7 +706,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
t.Fatal(err)
}
- err = streamer.Quit(peerID, stream)
+ err = streamer.Quit(node.ID(), stream)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -652,7 +719,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Msg: &QuitMsg{
Stream: stream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})
@@ -663,7 +730,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
historyStream := getHistoryStream(stream)
- err = streamer.Quit(peerID, historyStream)
+ err = streamer.Quit(node.ID(), historyStream)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -676,7 +743,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Msg: &QuitMsg{
Stream: historyStream,
},
- Peer: peerID,
+ Peer: node.ID(),
},
},
})