diff options
Diffstat (limited to 'swarm/network/stream/snapshot_sync_test.go')
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 170 |
1 files changed, 115 insertions, 55 deletions
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 313019d6a..7cd09099c 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "runtime" "sync" "testing" "time" @@ -39,15 +40,20 @@ import ( mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) -const testMinProxBinSize = 2 const MaxTimeout = 600 type synctestConfig struct { - addrs [][]byte - hashes []storage.Address - idToChunksMap map[discover.NodeID][]int - chunksToNodesMap map[string][]int - addrToIDMap map[string]discover.NodeID + addrs [][]byte + hashes []storage.Address + idToChunksMap map[discover.NodeID][]int + //chunksToNodesMap map[string][]int + addrToIDMap map[string]discover.NodeID +} + +// Tests in this file should not request chunks from peers. +// This function will panic indicating that there is a problem if request has been made. +func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { + panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String())) } //This test is a syncing test for nodes. @@ -58,6 +64,9 @@ type synctestConfig struct { //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { @@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) - err := testSyncingViaDirectSubscribe(*chunks, *nodes) + err := testSyncingViaDirectSubscribe(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { for _, chnk := range chnkCnt { for _, n := range nodeCnt { log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) - err := testSyncingViaDirectSubscribe(chnk, n) + err := testSyncingViaDirectSubscribe(t, chnk, n) if err != nil { t.Fatal(err) } @@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { t.Fatal(err) } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. allSuccess := false @@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { }() } for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if result.Error != nil { t.Fatal(result.Error) } + log.Info("Simulation ended") } /* @@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. */ -func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { +func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { @@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, }) defer sim.Close() - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() conf := &synctestConfig{} @@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return err } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { // or until the timeout is reached. allSuccess := false for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return result.Error } - log.Info("Simulation terminated") + log.Info("Simulation ended") return nil } @@ -462,10 +527,9 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //iterate over each bin and solicit needed subscription to bins kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to - histRange := &Range{} subCnt++ - err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top) + err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High) if err != nil { log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err)) return false @@ -478,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //map chunk keys to addresses which are responsible func mapKeysToNodes(conf *synctestConfig) { - kmap := make(map[string][]int) nodemap := make(map[string][]int) //build a pot for chunk hashes np := pot.NewPot(nil, 0) @@ -487,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) { indexmap[string(a)] = i np, _, _ = pot.Add(np, a, pof) } + + var kadMinProxSize = 2 + + ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs) + //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes)) for i := 0; i < len(conf.hashes); i++ { - pl := 256 //highest possible proximity - var nns []int + var a []byte np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool { - a := val.([]byte) - if pl < 256 && pl != po { - return false - } - if pl == 256 || pl == po { - log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)])) - nns = append(nns, indexmap[string(a)]) - nodemap[string(a)] = append(nodemap[string(a)], i) - } - if pl == 256 && len(nns) >= testMinProxBinSize { - //maxProxBinSize has been reached at this po, so save it - //we will add all other nodes at the same po - pl = po - } - return true + // take the first address + a = val.([]byte) + return false }) - kmap[string(conf.hashes[i])] = nns + + nns := ppmap[common.Bytes2Hex(a)].NNSet + nns = append(nns, a) + + for _, p := range nns { + nodemap[string(p)] = append(nodemap[string(p)], i) + } } for addr, chunks := range nodemap { //this selects which chunks are expected to be found with the given node conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks } log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap)) - conf.chunksToNodesMap = kmap } //upload a file(chunks) to a single local node store |