diff options
author | holisticode <holistic.computing@gmail.com> | 2018-07-31 04:55:25 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-07-31 04:55:25 +0800 |
commit | d6efa691872efb723ea3177a92da9e9b31c34eba (patch) | |
tree | 9c7e85c9cab9a2cf1240db47a8de44162f69353e /swarm/network/stream/syncer_test.go | |
parent | 3ea8ac6a9ab9e56164707119e9142f06fae4c316 (diff) | |
download | dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.gz dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.tar.zst dexon-d6efa691872efb723ea3177a92da9e9b31c34eba.zip |
Merge netsim mig to master (#17241)
* swarm: merged stream-tests migration to develop
* swarm/network: expose simulation RandomUpNode to use in stream tests
* swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest
* swarm: enforce waitkademlia for snapshot tests
* swarm: fixed syncer tests and snapshot_sync_test
* swarm: linting of simulation package
* swarm: address review comments
* swarm/network/stream: fix delivery_test bugs and refactor
* swarm/network/stream: addressed PR comments @janos
* swarm/network/stream: enforce waitKademlia, improve TestIntervals
* swarm/network/stream: TestIntervals not waiting for chunk to be stored
Diffstat (limited to 'swarm/network/stream/syncer_test.go')
-rw-r--r-- | swarm/network/stream/syncer_test.go | 337 |
1 files changed, 156 insertions, 181 deletions
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index a3d53e648..f72aa3444 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -23,18 +23,22 @@ import ( "io" "io/ioutil" "math" + "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" - "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) const dataChunkCount = 200 @@ -46,222 +50,193 @@ func TestSyncerSimulation(t *testing.T) { testSyncBetweenNodes(t, 16, 1, dataChunkCount, true, 1) } -func createMockStore(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) { - var err error +func createMockStore(globalStore *mockdb.GlobalStore, id discover.NodeID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) { address := common.BytesToAddress(id.Bytes()) mockStore := globalStore.NewNodeStore(address) params := storage.NewDefaultLocalStoreParams() - datadirs[id], err = ioutil.TempDir("", "localMockStore-"+id.TerminalString()) + + datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString()) if err != nil { - return nil, err + return nil, "", err } - params.Init(datadirs[id]) + params.Init(datadir) params.BaseKey = addr.Over() - lstore, err := storage.NewLocalStore(params, mockStore) - return lstore, nil + lstore, err = storage.NewLocalStore(params, mockStore) + return lstore, datadir, nil } func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool, po uint8) { - defer setDefaultSkipCheck(defaultSkipCheck) - defaultSkipCheck = skipCheck - //data directories for each node and store - datadirs = make(map[discover.NodeID]string) - if *useMockStore { - createStoreFunc = createMockStore - createGlobalStore() - } else { - createStoreFunc = createTestLocalStorageFromSim - } - defer datadirsCleanup() + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + var store storage.ChunkStore + var globalStore *mockdb.GlobalStore + var gDir, datadir string + + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + //hack to put addresses in same space + addr.OAddr[0] = byte(0) + + if *useMockStore { + gDir, globalStore, err = createGlobalStore() + if err != nil { + return nil, nil, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") + } + store, datadir, err = createMockStore(globalStore, id, addr) + } else { + store, datadir, err = createTestLocalStorageForID(id, addr) + } + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + store.Close() + os.RemoveAll(datadir) + if *useMockStore { + err := globalStore.Close() + if err != nil { + log.Error("Error closing global store! %v", "err", err) + } + os.RemoveAll(gDir) + } + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + bucket.Store(bucketKeyDB, db) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + bucket.Store(bucketKeyDelivery, delivery) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + }) + + fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() - registries = make(map[discover.NodeID]*TestRegistry) - toAddr = func(id discover.NodeID) *network.BzzAddr { - addr := network.NewAddrFromNodeID(id) - //hack to put addresses in same space - addr.OAddr[0] = byte(0) - return addr - } - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: conns, - ToAddr: toAddr, - Services: services, - EnableMsgEvents: false, - } - // HACK: these are global variables in the test so that they are available for - // the service constructor function - // TODO: will this work with exec/docker adapter? - // localstore of nodes made available for action and check calls - stores = make(map[discover.NodeID]storage.ChunkStore) - deliveries = make(map[discover.NodeID]*Delivery) // create context for simulation run timeout := 30 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) // defer cancel should come before defer simulation teardown defer cancel() - // create simulation network with the config - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() - nodeIndex := make(map[discover.NodeID]int) - for i, id := range sim.IDs { - nodeIndex[id] = i - if !*useMockStore { - stores[id] = sim.Stores[i] - sim.Stores[i] = stores[id] + nodeIndex := make(map[discover.NodeID]int) + for i, id := range nodeIDs { + nodeIndex[id] = i } - } - // peerCount function gives the number of peer connections for a nodeID - // this is needed for the service run function to wait until - // each protocol instance runs and the streamer peers are available - peerCount = func(id discover.NodeID) int { - if sim.IDs[0] == id || sim.IDs[nodes-1] == id { - return 1 - } - return 2 - } - waitPeerErrC = make(chan error) - // create DBAPI-s for all nodes - dbs := make([]*storage.DBAPI, nodes) - for i := 0; i < nodes; i++ { - dbs[i] = storage.NewDBAPI(sim.Stores[i].(*storage.LocalStore)) - } - - // collect hashes in po 1 bin for each node - hashes := make([][]storage.Address, nodes) - totalHashes := 0 - hashCounts := make([]int, nodes) - for i := nodes - 1; i >= 0; i-- { - if i < nodes-1 { - hashCounts[i] = hashCounts[i+1] - } - dbs[i].Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { - hashes[i] = append(hashes[i], addr) - totalHashes++ - hashCounts[i]++ - return true - }) - } - - // errc is error channel for simulation - errc := make(chan error, 1) - quitC := make(chan struct{}) - defer close(quitC) + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal(d.Error) + } + } + }() - // action is subscribe - action := func(ctx context.Context) error { - // need to wait till an aynchronous process registers the peers in streamer.peers - // that is used by Subscribe - // the global peerCount function tells how many connections each node has - // TODO: this is to be reimplemented with peerEvent watcher without global var - i := 0 - for err := range waitPeerErrC { + // each node Subscribes to each other's swarmChunkServerStreamName + for j := 0; j < nodes-1; j++ { + id := nodeIDs[j] + client, err := sim.Net.GetNode(id).Client() if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + t.Fatal(err) } - i++ - if i == nodes { - break + sid := nodeIDs[j+1] + client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top) + if err != nil { + return err } - } - // each node Subscribes to each other's swarmChunkServerStreamName - for j := 0; j < nodes-1; j++ { - id := sim.IDs[j] - sim.Stores[j] = stores[id] - err := sim.CallClient(id, func(client *rpc.Client) error { - // report disconnect events to the error channel cos peers should not disconnect - doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC) + if j > 0 || nodes == 2 { + item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + size := chunkCount * chunkSize + _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) if err != nil { - return err + t.Fatal(err.Error()) } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - // start syncing, i.e., subscribe to upstream peers po 1 bin - sid := sim.IDs[j+1] - return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top) - }) - if err != nil { - return err + wait(ctx) } } // here we distribute chunks of a random file into stores 1...nodes - rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams()) - size := chunkCount * chunkSize - _, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) - if err != nil { - t.Fatal(err.Error()) - } - // need to wait cos we then immediately collect the relevant bin content - wait(ctx) - if err != nil { - t.Fatal(err.Error()) + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err } - return nil - } - - // this makes sure check is not called before the previous call finishes - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case err := <-errc: - return false, err - case <-ctx.Done(): - return false, ctx.Err() - default: + // collect hashes in po 1 bin for each node + hashes := make([][]storage.Address, nodes) + totalHashes := 0 + hashCounts := make([]int, nodes) + for i := nodes - 1; i >= 0; i-- { + if i < nodes-1 { + hashCounts[i] = hashCounts[i+1] + } + item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB) + if !ok { + return fmt.Errorf("No DB") + } + db := item.(*storage.DBAPI) + db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { + hashes[i] = append(hashes[i], addr) + totalHashes++ + hashCounts[i]++ + return true + }) } - - i := nodeIndex[id] var total, found int - - for j := i; j < nodes; j++ { - total += len(hashes[j]) - for _, key := range hashes[j] { - chunk, err := dbs[i].Get(ctx, key) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { - continue + for _, node := range nodeIDs { + i := nodeIndex[node] + + for j := i; j < nodes; j++ { + total += len(hashes[j]) + for _, key := range hashes[j] { + item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB) + if !ok { + return fmt.Errorf("No DB") + } + db := item.(*storage.DBAPI) + chunk, err := db.Get(ctx, key) + if err == storage.ErrFetching { + <-chunk.ReqC + } else if err != nil { + continue + } + // needed for leveldb not to be closed? + // chunk.WaitToStore() + found++ } - // needed for leveldb not to be closed? - // chunk.WaitToStore() - found++ } + log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) } - log.Debug("sync check", "node", id, "index", i, "bin", po, "found", found, "total", total) - return total == found, nil - } + if total == found && total > 0 { + return nil + } + return fmt.Errorf("Total not equallying found: total is %d", total) + }) - conf.Step = &simulations.Step{ - Action: action, - Trigger: streamTesting.Trigger(500*time.Millisecond, quitC, sim.IDs[0:nodes-1]...), - Expect: &simulations.Expectation{ - Nodes: sim.IDs[0:1], - Check: check, - }, - } - startedAt := time.Now() - result, err := sim.Run(ctx, conf) - finishedAt := time.Now() - if err != nil { - t.Fatalf("Setting up simulation failed: %v", err) - } if result.Error != nil { - t.Fatalf("Simulation failed: %s", result.Error) + t.Fatal(result.Error) } - streamTesting.CheckResult(t, result, startedAt, finishedAt) } |