diff options
Diffstat (limited to 'swarm/network/stream/delivery_test.go')
-rw-r--r-- | swarm/network/stream/delivery_test.go | 93 |
1 files changed, 48 insertions, 45 deletions
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 972cc859a..ece54d4ee 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) + ctx := context.Background() + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + streamer.delivery.RequestFromPeers(ctx, req) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { Code: 5, Msg: &RetrieveRequestMsg{ - Addr: chunk.Addr[:], + Addr: chunk.Address()[:], }, Peer: peerID, }, @@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }) hash := storage.Address(hash0[:]) - chunk := storage.NewChunk(hash, nil) - chunk.SData = hash - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk := storage.NewChunk(hash, hash) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } hash = storage.Address(hash1[:]) - chunk = storage.NewChunk(hash, nil) - chunk.SData = hash1[:] - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk = storage.NewChunk(hash, hash1[:]) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) - - if !created { - t.Fatal("chunk already exists") - } - select { - case <-chunk.ReqC: - t.Fatal("chunk is already received") - default: - } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { }, }, p2ptest.Exchange{ - Label: "ChunkDeliveryRequest message", + Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ { Code: 6, @@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { if err != nil { t.Fatalf("Expected no error, got %v", err) } - - timeout := time.NewTimer(1 * time.Second) - - select { - case <-timeout.C: - t.Fatal("timeout receiving chunk") - case <-chunk.ReqC: + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // wait for the chunk to get stored + storedChunk, err := localStore.Get(ctx, chunkKey) + for err != nil { + select { + case <-ctx.Done(): + t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) + default: + } + storedChunk, err = localStore.Get(ctx, chunkKey) + time.Sleep(50 * time.Millisecond) } - storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } - if !bytes.Equal(storedChunk.SData, chunkData) { + if !bytes.Equal(storedChunk.Data(), chunkData) { t.Fatal("Retrieved chunk has different data than original") } @@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) @@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - id := ctx.Config.ID addr := network.NewAddrFromNodeID(id) store, datadir, err := createTestLocalStorageForID(id, addr) @@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, DoSync: true, SyncUpdateDelay: 0, }) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) |