diff options
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r-- | swarm/network/stream/delivery_test.go | 97 |
1 files changed, 69 insertions, 28 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 949645558..29b4f2f69 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -38,8 +38,13 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) +//Tests initializing a retrieve request func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) + regOpts := &RegistryOptions{ + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, + } + tester, streamer, _, teardown, err := newStreamerTester(t, regOpts) defer teardown() if err != nil { t.Fatal(err) @@ -55,10 +60,21 @@ func TestStreamerRetrieveRequest(t *testing.T) { ) streamer.delivery.RequestFromPeers(ctx, req) + stream := NewStream(swarmChunkServerStreamName, "", true) + err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", Expects: []p2ptest.Expect{ - { + { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + { //expect a retrieve request message for the given hash Code: 5, Msg: &RetrieveRequestMsg{ Addr: hash0[:], @@ -74,9 +90,12 @@ func TestStreamerRetrieveRequest(t *testing.T) { } } +//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) +//Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, //do no syncing }) defer teardown() if err != nil { @@ -89,16 +108,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(node.ID()) + stream := NewStream(swarmChunkServerStreamName, "", true) + //simulate pre-subscription to RETRIEVE_REQUEST stream on peer peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: NewStream(swarmChunkServerStreamName, "", true), + Stream: stream, History: nil, Priority: Top, }) + //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { //first expect a subscription to the RETRIEVE_REQUEST stream + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ - { + { //then the actual RETRIEVE_REQUEST.... Code: 5, Msg: &RetrieveRequestMsg{ Addr: chunk.Address()[:], @@ -107,7 +141,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }, Expects: []p2ptest.Expect{ - { + { //to which the peer responds with offered hashes Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: nil, @@ -120,7 +154,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }) - expectedError := `exchange #0 "RetrieveRequestMsg": timed out` + //should fail with a timeout as the peer we are requesting + //the chunk from does not have the chunk + expectedError := `exchange #1 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -130,7 +166,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -138,6 +175,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } node := tester.Nodes[0] + peer := streamer.getPeer(node.ID()) stream := NewStream(swarmChunkServerStreamName, "", true) @@ -156,6 +194,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -226,7 +276,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -241,6 +292,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { node := tester.Nodes[0] + //subscribe to custom stream stream := NewStream("foo", "", true) err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { @@ -253,7 +305,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ - { + { //first expect subscription to the custom stream... Code: 4, Msg: &SubscribeMsg{ Stream: stream, @@ -267,7 +319,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { p2ptest.Exchange{ Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ - { + { //...then trigger a chunk delivery for the given chunk from peer in order for + //local node to get the chunk delivered Code: 6, Msg: &ChunkDeliveryMsg{ Addr: chunkKey, @@ -342,8 +395,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - DoServeRetrieve: true, + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, }) bucket.Store(bucketKeyRegistry, r) @@ -408,20 +462,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck return err } - //each of the nodes (except pivot node) subscribes to the stream of the next node - for j, node := range nodeIDs[0 : nodes-1] { - sid := nodeIDs[j+1] - item, ok := sim.NodeItem(node, bucketKeyRegistry) - if !ok { - return fmt.Errorf("No registry") - } - registry := item.(*Registry) - err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - //get the pivot node's filestore item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) if !ok { @@ -530,7 +570,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, - DoSync: true, + Syncing: SyncingDisabled, + Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }) |