diff options
author | holisticode <holistic.computing@gmail.com> | 2018-10-09 02:28:44 +0800 |
---|---|---|
committer | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-10-09 02:28:44 +0800 |
commit | 11d0ff6578c34b724436dbeeede726b31b41c8b8 (patch) | |
tree | 2aa195426b18494e92d160b244b5efd3650ba9fa /swarm/network/stream/snapshot_sync_test.go | |
parent | 72a076840bea4a3258a3a8d9a7aeb750fcc2ac02 (diff) | |
download | go-tangerine-11d0ff6578c34b724436dbeeede726b31b41c8b8.tar.gz go-tangerine-11d0ff6578c34b724436dbeeede726b31b41c8b8.tar.zst go-tangerine-11d0ff6578c34b724436dbeeede726b31b41c8b8.zip |
Fix retrieval tests and simulation backends (#17723)
* swarm/network/stream: introduced visualized snapshot sync test
* swarm/network/stream: non-existing hash visualization sim
* swarm/network/stream: fixed retrieval tests; new backend for visualization
* swarm/network/stream: cleanup of visualized_snapshot_sync_sim_test.go
* swarm/network/stream: rebased PR on master
* swarm/network/stream: fixed loop logic in retrieval tests
* swarm/network/stream: fixed iterations for snapshot tests
* swarm/network/stream: address PR comments
* swarm/network/stream: addressed PR comments
Diffstat (limited to 'swarm/network/stream/snapshot_sync_test.go')
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 150 |
1 files changed, 83 insertions, 67 deletions
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index d93afce1b..0d5849487 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" @@ -50,6 +51,17 @@ type synctestConfig struct { addrToIDMap map[string]enode.ID } +const ( + // EventTypeNode is the type of event emitted when a node is either + // created, started or stopped + EventTypeChunkCreated simulations.EventType = "chunkCreated" + EventTypeChunkOffered simulations.EventType = "chunkOffered" + EventTypeChunkWanted simulations.EventType = "chunkWanted" + EventTypeChunkDelivered simulations.EventType = "chunkDelivered" + EventTypeChunkArrived simulations.EventType = "chunkArrived" + EventTypeSimTerminated simulations.EventType = "simTerminated" +) + // Tests in this file should not request chunks from peers. // This function will panic indicating that there is a problem if request has been made. func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { @@ -131,41 +143,46 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { } } -func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - n := ctx.Config.Node() - addr := network.NewAddr(n) - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if err != nil { - return nil, nil, err - } - bucket.Store(bucketKeyStore, store) - localStore := store.(*storage.LocalStore) - netStore, err := storage.NewNetStore(localStore, nil) - if err != nil { - return nil, nil, err - } - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, netStore) - netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New +var simServiceMap = map[string]simulation.ServiceFunc{ + "streamer": streamerFunc, +} - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - SyncUpdateDelay: 3 * time.Second, - }) - bucket.Store(bucketKeyRegistry, r) +func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + n := ctx.Config.Node() + addr := network.NewAddr(n) + store, datadir, err := createTestLocalStorageForID(n.ID(), addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - cleanup = func() { - os.RemoveAll(datadir) - netStore.Close() - r.Close() - } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + DoSync: true, + SyncUpdateDelay: 3 * time.Second, + }) - return r, cleanup, nil + bucket.Store(bucketKeyRegistry, r) - }, - }) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + + return r, cleanup, nil + +} + +func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { + sim := simulation.New(simServiceMap) defer sim.Close() log.Info("Initializing test config") @@ -204,7 +221,17 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { } }() - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + result := runSim(conf, ctx, sim, chunkCount) + + if result.Error != nil { + t.Fatal(result.Error) + } + log.Info("Simulation ended") +} + +func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result { + + return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { //get the kademlia overlay address from this ID @@ -229,12 +256,19 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if err != nil { return err } + for _, h := range hashes { + evt := &simulations.Event{ + Type: EventTypeChunkCreated, + Node: sim.Net.GetNode(node.ID), + Data: h.String(), + } + sim.Net.Events().Send(evt) + } conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - allSuccess := false var gDir string var globalStore *mockdb.GlobalStore if *useMockStore { @@ -250,12 +284,11 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { } }() } - for !allSuccess { - allSuccess = true + REPEAT: + for { for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] - localSuccess := true for _, ch := range localChunks { //get the real chunk by the index in the index array chunk := conf.hashes[ch] @@ -277,29 +310,22 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { } if err != nil { log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - localSuccess = false // Do not get crazy with logging the warn message time.Sleep(500 * time.Millisecond) - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + continue REPEAT } - } - if !localSuccess { - allSuccess = false - break + evt := &simulations.Event{ + Type: EventTypeChunkArrived, + Node: sim.Net.GetNode(id), + Data: chunk.String(), + } + sim.Net.Events().Send(evt) + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } + return nil } - if !allSuccess { - return fmt.Errorf("Not all chunks succeeded!") - } - return nil }) - - if result.Error != nil { - t.Fatal(result.Error) - } - log.Info("Simulation ended") } /* @@ -459,13 +485,11 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) } // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. - allSuccess := false - for !allSuccess { - allSuccess = true + REPEAT: + for { for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] - localSuccess := true for _, ch := range localChunks { //get the real chunk by the index in the index array chunk := conf.hashes[ch] @@ -487,23 +511,15 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) } if err != nil { log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - localSuccess = false // Do not get crazy with logging the warn message time.Sleep(500 * time.Millisecond) - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + continue REPEAT } - } - if !localSuccess { - allSuccess = false - break + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } + return nil } - if !allSuccess { - return fmt.Errorf("Not all chunks succeeded!") - } - return nil }) if result.Error != nil { |