aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/snapshot_sync_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/snapshot_sync_test.go')
-rw-r--r--swarm/network/stream/snapshot_sync_test.go150
1 files changed, 83 insertions, 67 deletions
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index d93afce1b..0d5849487 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
@@ -50,6 +51,17 @@ type synctestConfig struct {
addrToIDMap map[string]enode.ID
}
+const (
+ // EventTypeNode is the type of event emitted when a node is either
+ // created, started or stopped
+ EventTypeChunkCreated simulations.EventType = "chunkCreated"
+ EventTypeChunkOffered simulations.EventType = "chunkOffered"
+ EventTypeChunkWanted simulations.EventType = "chunkWanted"
+ EventTypeChunkDelivered simulations.EventType = "chunkDelivered"
+ EventTypeChunkArrived simulations.EventType = "chunkArrived"
+ EventTypeSimTerminated simulations.EventType = "simTerminated"
+)
+
// Tests in this file should not request chunks from peers.
// This function will panic indicating that there is a problem if request has been made.
func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
@@ -131,41 +143,46 @@ func TestSyncingViaDirectSubscribe(t *testing.T) {
}
}
-func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
+var simServiceMap = map[string]simulation.ServiceFunc{
+ "streamer": streamerFunc,
+}
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoSync: true,
- SyncUpdateDelay: 3 * time.Second,
- })
- bucket.Store(bucketKeyRegistry, r)
+func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ n := ctx.Config.Node()
+ addr := network.NewAddr(n)
+ store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyStore, store)
+ localStore := store.(*storage.LocalStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ DoSync: true,
+ SyncUpdateDelay: 3 * time.Second,
+ })
- return r, cleanup, nil
+ bucket.Store(bucketKeyRegistry, r)
- },
- })
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
+ return r, cleanup, nil
+
+}
+
+func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
+ sim := simulation.New(simServiceMap)
defer sim.Close()
log.Info("Initializing test config")
@@ -204,7 +221,17 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
}
}()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ result := runSim(conf, ctx, sim, chunkCount)
+
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+ log.Info("Simulation ended")
+}
+
+func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {
+
+ return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
@@ -229,12 +256,19 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
if err != nil {
return err
}
+ for _, h := range hashes {
+ evt := &simulations.Event{
+ Type: EventTypeChunkCreated,
+ Node: sim.Net.GetNode(node.ID),
+ Data: h.String(),
+ }
+ sim.Net.Events().Send(evt)
+ }
conf.hashes = append(conf.hashes, hashes...)
mapKeysToNodes(conf)
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
- allSuccess := false
var gDir string
var globalStore *mockdb.GlobalStore
if *useMockStore {
@@ -250,12 +284,11 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
}
}()
}
- for !allSuccess {
- allSuccess = true
+ REPEAT:
+ for {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
- localSuccess := true
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
@@ -277,29 +310,22 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
}
if err != nil {
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
- localSuccess = false
// Do not get crazy with logging the warn message
time.Sleep(500 * time.Millisecond)
- } else {
- log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ continue REPEAT
}
- }
- if !localSuccess {
- allSuccess = false
- break
+ evt := &simulations.Event{
+ Type: EventTypeChunkArrived,
+ Node: sim.Net.GetNode(id),
+ Data: chunk.String(),
+ }
+ sim.Net.Events().Send(evt)
+ log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
+ return nil
}
- if !allSuccess {
- return fmt.Errorf("Not all chunks succeeded!")
- }
- return nil
})
-
- if result.Error != nil {
- t.Fatal(result.Error)
- }
- log.Info("Simulation ended")
}
/*
@@ -459,13 +485,11 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
}
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
- allSuccess := false
- for !allSuccess {
- allSuccess = true
+ REPEAT:
+ for {
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
- localSuccess := true
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
@@ -487,23 +511,15 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
}
if err != nil {
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
- localSuccess = false
// Do not get crazy with logging the warn message
time.Sleep(500 * time.Millisecond)
- } else {
- log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ continue REPEAT
}
- }
- if !localSuccess {
- allSuccess = false
- break
+ log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
+ return nil
}
- if !allSuccess {
- return fmt.Errorf("Not all chunks succeeded!")
- }
- return nil
})
if result.Error != nil {