aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
authorholisticode <holistic.computing@gmail.com>2019-05-11 18:55:06 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2019-05-11 18:55:06 +0800
commit77442418865a58ae5888e4b7113608031a237006 (patch)
tree3908d0de7ab0437cdee1b9aa80bbed704290a94c /swarm/network
parent6ec6b290511a7433a36a40d1e8c2cca3d9613d6f (diff)
downloadgo-tangerine-77442418865a58ae5888e4b7113608031a237006.tar.gz
go-tangerine-77442418865a58ae5888e4b7113608031a237006.tar.zst
go-tangerine-77442418865a58ae5888e4b7113608031a237006.zip
swarm/network/stream: add pure retrieval test (#19552)
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go273
1 files changed, 218 insertions, 55 deletions
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index e34f87951..50617b5cf 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -16,8 +16,10 @@
package stream
import (
+ "bytes"
"context"
"fmt"
+ "io"
"sync"
"testing"
"time"
@@ -33,17 +35,17 @@ import (
"github.com/ethereum/go-ethereum/swarm/testutil"
)
-//constants for random file generation
+// constants for random file generation
const (
minFileSize = 2
maxFileSize = 40
)
-//This test is a retrieval test for nodes.
-//A configurable number of nodes can be
-//provided to the test.
-//Files are uploaded to nodes, other nodes try to retrieve the file
-//Number of nodes can be provided via commandline too.
+// TestFileRetrieval is a retrieval test for nodes.
+// A configurable number of nodes can be
+// provided to the test.
+// Files are uploaded to nodes, other nodes try to retrieve the file
+// Number of nodes can be provided via commandline too.
func TestFileRetrieval(t *testing.T) {
var nodeCount []int
@@ -61,26 +63,57 @@ func TestFileRetrieval(t *testing.T) {
}
for _, nc := range nodeCount {
- if err := runFileRetrievalTest(nc); err != nil {
- t.Error(err)
+ runFileRetrievalTest(t, nc)
+ }
+}
+
+// TestPureRetrieval tests pure retrieval without syncing
+// A configurable number of nodes and chunks
+// can be provided to the test.
+// A number of random chunks is generated, then stored directly in
+// each node's localstore according to their address.
+// Each chunk is supposed to end up at certain nodes
+// With retrieval we then make sure that every node can actually retrieve
+// the chunks.
+func TestPureRetrieval(t *testing.T) {
+ var nodeCount []int
+ var chunkCount []int
+
+ if *nodes != 0 && *chunks != 0 {
+ nodeCount = []int{*nodes}
+ chunkCount = []int{*chunks}
+ } else {
+ nodeCount = []int{16}
+ chunkCount = []int{150}
+
+ if *longrunning {
+ nodeCount = append(nodeCount, 32, 64)
+ chunkCount = append(chunkCount, 32, 256)
+ } else if testutil.RaceEnabled {
+ nodeCount = []int{4}
+ chunkCount = []int{4}
+ }
+
+ }
+
+ for _, nc := range nodeCount {
+ for _, c := range chunkCount {
+ runPureRetrievalTest(t, nc, c)
}
}
}
-//This test is a retrieval test for nodes.
-//One node is randomly selected to be the pivot node.
-//A configurable number of chunks and nodes can be
-//provided to the test, the number of chunks is uploaded
-//to the pivot node and other nodes try to retrieve the chunk(s).
-//Number of chunks and nodes can be provided via commandline too.
+// TestRetrieval tests retrieval of chunks by random nodes.
+// One node is randomly selected to be the pivot node.
+// A configurable number of chunks and nodes can be
+// provided to the test, the number of chunks is uploaded
+// to the pivot node and other nodes try to retrieve the chunk(s).
+// Number of chunks and nodes can be provided via commandline too.
func TestRetrieval(t *testing.T) {
- //if nodes/chunks have been provided via commandline,
- //run the tests with these values
+ // if nodes/chunks have been provided via commandline,
+ // run the tests with these values
if *nodes != 0 && *chunks != 0 {
- err := runRetrievalTest(t, *chunks, *nodes)
- if err != nil {
- t.Fatal(err)
- }
+ runRetrievalTest(t, *chunks, *nodes)
} else {
nodeCnt := []int{16}
chnkCnt := []int{32}
@@ -96,10 +129,7 @@ func TestRetrieval(t *testing.T) {
for _, n := range nodeCnt {
for _, c := range chnkCnt {
t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
- err := runRetrievalTest(t, c, n)
- if err != nil {
- t.Fatal(err)
- }
+ runRetrievalTest(t, c, n)
})
}
}
@@ -132,15 +162,160 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
},
}
-/*
-The test loads a snapshot file to construct the swarm network,
-assuming that the snapshot file identifies a healthy
-kademlia network. Nevertheless a health check runs in the
-simulation's `action` function.
+// runPureRetrievalTest by uploading a snapshot,
+// then starting a simulation, distribute chunks to nodes
+// and start retrieval.
+// The snapshot should have 'streamer' in its service list.
+func runPureRetrievalTest(t *testing.T, nodeCount int, chunkCount int) {
+
+ t.Helper()
+ // the pure retrieval test needs a different service map, as we want
+ // syncing disabled and we don't need to set the syncUpdateDelay
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Syncing: SyncingDisabled,
+ }, nil)
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
+ },
+ )
+ defer sim.Close()
+
+ log.Info("Initializing test config", "node count", nodeCount)
+
+ conf := &synctestConfig{}
+ //map of discover ID to indexes of chunks expected at that ID
+ conf.idToChunksMap = make(map[enode.ID][]int)
+ //map of overlay address to discover ID
+ conf.addrToIDMap = make(map[string]enode.ID)
+ //array where the generated chunk hashes will be stored
+ conf.hashes = make([]storage.Address, 0)
+
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
+ defer cancelSimRun()
+
+ filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
+ err := sim.UploadSnapshot(ctx, filename)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ log.Info("Starting simulation")
+
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ // first iteration: create addresses
+ for _, n := range nodeIDs {
+ //get the kademlia overlay address from this ID
+ a := n.Bytes()
+ //append it to the array of all overlay addresses
+ conf.addrs = append(conf.addrs, a)
+ //the proximity calculation is on overlay addr,
+ //the p2p/simulations check func triggers on enode.ID,
+ //so we need to know which overlay addr maps to which nodeID
+ conf.addrToIDMap[string(a)] = n
+ }
+
+ // now create random chunks
+ chunks := storage.GenerateRandomChunks(int64(chunkSize), chunkCount)
+ for _, chunk := range chunks {
+ conf.hashes = append(conf.hashes, chunk.Address())
+ }
+
+ log.Debug("random chunks generated, mapping keys to nodes")
+
+ // map addresses to nodes
+ mapKeysToNodes(conf)
+
+ // second iteration: storing chunks at the peer whose
+ // overlay address is closest to a particular chunk's hash
+ log.Debug("storing every chunk at correspondent node store")
+ for _, id := range nodeIDs {
+ // for every chunk for this node (which are only indexes)...
+ for _, ch := range conf.idToChunksMap[id] {
+ item, ok := sim.NodeItem(id, bucketKeyStore)
+ if !ok {
+ return fmt.Errorf("Error accessing localstore")
+ }
+ lstore := item.(chunk.Store)
+ // ...get the actual chunk
+ for _, chnk := range chunks {
+ if bytes.Equal(chnk.Address(), conf.hashes[ch]) {
+ // ...and store it in the localstore
+ if _, err = lstore.Put(ctx, chunk.ModePutUpload, chnk); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ }
+
+ // now try to retrieve every chunk from every node
+ log.Debug("starting retrieval")
+ cnt := 0
+
+ for _, id := range nodeIDs {
+ item, ok := sim.NodeItem(id, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
+ }
+ fileStore := item.(*storage.FileStore)
+ for _, chunk := range chunks {
+ reader, _ := fileStore.Retrieve(context.TODO(), chunk.Address())
+ content := make([]byte, chunkSize)
+ size, err := reader.Read(content)
+ //check chunk size and content
+ ok := true
+ if err != io.EOF {
+ log.Debug("Retrieve error", "err", err, "hash", chunk.Address(), "nodeId", id)
+ ok = false
+ }
+ if size != chunkSize {
+ log.Debug("size not equal chunkSize", "size", size, "hash", chunk.Address(), "nodeId", id)
+ ok = false
+ }
+ // skip chunk "metadata" for chunk.Data()
+ if !bytes.Equal(content, chunk.Data()[8:]) {
+ log.Debug("content not equal chunk data", "hash", chunk.Address(), "nodeId", id)
+ ok = false
+ }
+ if !ok {
+ return fmt.Errorf("Expected test to succeed at first run, but failed with chunk not found")
+ }
+ log.Debug(fmt.Sprintf("chunk with root hash %x successfully retrieved", chunk.Address()))
+ cnt++
+ }
+ }
+ log.Info("retrieval terminated, chunks retrieved: ", "count", cnt)
+ return nil
+
+ })
+
+ log.Info("Simulation terminated")
+
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
+
+// runFileRetrievalTest loads a snapshot file to construct the swarm network.
+// The snapshot should have 'streamer' in its service list.
+func runFileRetrievalTest(t *testing.T, nodeCount int) {
+
+ t.Helper()
-The snapshot should have 'streamer' in its service list.
-*/
-func runFileRetrievalTest(nodeCount int) error {
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()
@@ -160,7 +335,7 @@ func runFileRetrievalTest(nodeCount int) error {
filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
err := sim.UploadSnapshot(ctx, filename)
if err != nil {
- return err
+ t.Fatal(err)
}
log.Info("Starting simulation")
@@ -180,9 +355,6 @@ func runFileRetrievalTest(nodeCount int) error {
//an array for the random files
var randomFiles []string
- //channel to signal when the upload has finished
- //uploadFinished := make(chan struct{})
- //channel to trigger new node checks
conf.hashes, randomFiles, err = uploadFilesToNodes(sim)
if err != nil {
@@ -221,24 +393,17 @@ func runFileRetrievalTest(nodeCount int) error {
log.Info("Simulation terminated")
if result.Error != nil {
- return result.Error
+ t.Fatal(result.Error)
}
-
- return nil
}
-/*
-The test generates the given number of chunks.
+// runRetrievalTest generates the given number of chunks.
+// The test loads a snapshot file to construct the swarm network.
+// The snapshot should have 'streamer' in its service list.
+func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) {
-The test loads a snapshot file to construct the swarm network,
-assuming that the snapshot file identifies a healthy
-kademlia network. Nevertheless a health check runs in the
-simulation's `action` function.
-
-The snapshot should have 'streamer' in its service list.
-*/
-func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
t.Helper()
+
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()
@@ -256,7 +421,7 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
err := sim.UploadSnapshot(ctx, filename)
if err != nil {
- return err
+ t.Fatal(err)
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
@@ -278,8 +443,8 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
if !ok {
return fmt.Errorf("No localstore")
}
- store := item.(chunk.Store)
- conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, store)
+ lstore := item.(chunk.Store)
+ conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
if err != nil {
return err
}
@@ -314,8 +479,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
})
if result.Error != nil {
- return result.Error
+ t.Fatal(result.Error)
}
-
- return nil
}