aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--swarm/network/simulation/bucket.go2
-rw-r--r--swarm/network/simulation/connect.go4
-rw-r--r--swarm/network/simulation/events.go11
-rw-r--r--swarm/network/simulation/http.go11
-rw-r--r--swarm/network/simulation/http_test.go7
-rw-r--r--swarm/network/simulation/node.go19
-rw-r--r--swarm/network/simulation/service.go2
-rw-r--r--swarm/network/stream/common_test.go364
-rw-r--r--swarm/network/stream/delivery_test.go572
-rw-r--r--swarm/network/stream/intervals_test.go506
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go865
-rw-r--r--swarm/network/stream/snapshot_sync_test.go805
-rw-r--r--swarm/network/stream/syncer_test.go337
-rw-r--r--swarm/network/stream/testing/testing.go293
14 files changed, 1365 insertions, 2433 deletions
diff --git a/swarm/network/simulation/bucket.go b/swarm/network/simulation/bucket.go
index b37afaaa4..ddbedb521 100644
--- a/swarm/network/simulation/bucket.go
+++ b/swarm/network/simulation/bucket.go
@@ -43,7 +43,7 @@ func (s *Simulation) SetNodeItem(id discover.NodeID, key interface{}, value inte
s.buckets[id].Store(key, value)
}
-// NodeItems returns a map of items from all nodes that are all set under the
+// NodesItems returns a map of items from all nodes that are all set under the
// same BucketKey.
func (s *Simulation) NodesItems(key interface{}) (values map[discover.NodeID]interface{}) {
s.mu.RLock()
diff --git a/swarm/network/simulation/connect.go b/swarm/network/simulation/connect.go
index 3fe82052b..3d0f6cb3f 100644
--- a/swarm/network/simulation/connect.go
+++ b/swarm/network/simulation/connect.go
@@ -54,7 +54,7 @@ func (s *Simulation) ConnectToLastNode(id discover.NodeID) (err error) {
// ConnectToRandomNode connects the node with provieded NodeID
// to a random node that is up.
func (s *Simulation) ConnectToRandomNode(id discover.NodeID) (err error) {
- n := s.randomUpNode(id)
+ n := s.RandomUpNode(id)
if n == nil {
return ErrNodeNotFound
}
@@ -135,7 +135,7 @@ func (s *Simulation) ConnectNodesStar(id discover.NodeID, ids []discover.NodeID)
return nil
}
-// ConnectNodesStar connects all nodes in a star topology
+// ConnectNodesStarPivot connects all nodes in a star topology
// with the center at already set pivot node.
// If ids argument is nil, all nodes that are up will be connected.
func (s *Simulation) ConnectNodesStarPivot(ids []discover.NodeID) (err error) {
diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go
index f9cfadb73..980a9a756 100644
--- a/swarm/network/simulation/events.go
+++ b/swarm/network/simulation/events.go
@@ -18,6 +18,7 @@ package simulation
import (
"context"
+ "sync"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent {
eventC := make(chan PeerEvent)
+ // wait group to make sure all subscriptions to admin peerEvents are established
+ // before this function returns.
+ var subsWG sync.WaitGroup
for _, id := range ids {
s.shutdownWG.Add(1)
+ subsWG.Add(1)
go func(id discover.NodeID) {
defer s.shutdownWG.Done()
client, err := s.Net.GetNode(id).Client()
if err != nil {
+ subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
if err != nil {
+ subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
defer sub.Unsubscribe()
+ subsWG.Done()
+
for {
select {
case <-ctx.Done():
@@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt
}(id)
}
+ // wait all subscriptions
+ subsWG.Wait()
return eventC
}
diff --git a/swarm/network/simulation/http.go b/swarm/network/simulation/http.go
index 40f13f32d..69ae3baec 100644
--- a/swarm/network/simulation/http.go
+++ b/swarm/network/simulation/http.go
@@ -29,7 +29,7 @@ var (
DefaultHTTPSimAddr = ":8888"
)
-//`With`(builder) pattern constructor for Simulation to
+//WithServer implements the builder pattern constructor for Simulation to
//start with a HTTP server
func (s *Simulation) WithServer(addr string) *Simulation {
//assign default addr if nothing provided
@@ -46,7 +46,12 @@ func (s *Simulation) WithServer(addr string) *Simulation {
Addr: addr,
Handler: s.handler,
}
- go s.httpSrv.ListenAndServe()
+ go func() {
+ err := s.httpSrv.ListenAndServe()
+ if err != nil {
+ log.Error("Error starting the HTTP server", "error", err)
+ }
+ }()
return s
}
@@ -55,7 +60,7 @@ func (s *Simulation) addSimulationRoutes() {
s.handler.POST("/runsim", s.RunSimulation)
}
-// StartNetwork starts all nodes in the network
+// RunSimulation is the actual POST endpoint runner
func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request) {
log.Debug("RunSimulation endpoint running")
s.runC <- struct{}{}
diff --git a/swarm/network/simulation/http_test.go b/swarm/network/simulation/http_test.go
index 4d8bf9946..775cf9219 100644
--- a/swarm/network/simulation/http_test.go
+++ b/swarm/network/simulation/http_test.go
@@ -96,7 +96,12 @@ func sendRunSignal(t *testing.T) {
if err != nil {
t.Fatalf("Request failed: %v", err)
}
- defer resp.Body.Close()
+ defer func() {
+ err := resp.Body.Close()
+ if err != nil {
+ log.Error("Error closing response body", "err", err)
+ }
+ }()
log.Debug("Signal sent")
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)
diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go
index bc433cfd8..784588fa6 100644
--- a/swarm/network/simulation/node.go
+++ b/swarm/network/simulation/node.go
@@ -195,7 +195,7 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i
return ids, nil
}
-//Upload a snapshot
+//UploadSnapshot uploads a snapshot to the simulation
//This method tries to open the json file provided, applies the config to all nodes
//and then loads the snapshot into the Simulation network
func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error {
@@ -203,7 +203,12 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption)
if err != nil {
return err
}
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ log.Error("Error closing snapshot file", "err", err)
+ }
+ }()
jsonbyte, err := ioutil.ReadAll(f)
if err != nil {
return err
@@ -294,7 +299,7 @@ func (s *Simulation) StopNode(id discover.NodeID) (err error) {
// StopRandomNode stops a random node.
func (s *Simulation) StopRandomNode() (id discover.NodeID, err error) {
- n := s.randomUpNode()
+ n := s.RandomUpNode()
if n == nil {
return id, ErrNodeNotFound
}
@@ -324,18 +329,18 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
-// randomUpNode returns a random SimNode that is up.
+// RandomUpNode returns a random SimNode that is up.
// Arguments are NodeIDs for nodes that should not be returned.
-func (s *Simulation) randomUpNode(exclude ...discover.NodeID) *adapters.SimNode {
+func (s *Simulation) RandomUpNode(exclude ...discover.NodeID) *adapters.SimNode {
return s.randomNode(s.UpNodeIDs(), exclude...)
}
-// randomUpNode returns a random SimNode that is not up.
+// randomDownNode returns a random SimNode that is not up.
func (s *Simulation) randomDownNode(exclude ...discover.NodeID) *adapters.SimNode {
return s.randomNode(s.DownNodeIDs(), exclude...)
}
-// randomUpNode returns a random SimNode from the slice of NodeIDs.
+// randomNode returns a random SimNode from the slice of NodeIDs.
func (s *Simulation) randomNode(ids []discover.NodeID, exclude ...discover.NodeID) *adapters.SimNode {
for _, e := range exclude {
var i int
diff --git a/swarm/network/simulation/service.go b/swarm/network/simulation/service.go
index d1cbf1f8b..02e7ad0cc 100644
--- a/swarm/network/simulation/service.go
+++ b/swarm/network/simulation/service.go
@@ -39,7 +39,7 @@ func (s *Simulation) Service(name string, id discover.NodeID) node.Service {
// RandomService returns a single Service by name on a
// randomly chosen node that is up.
func (s *Simulation) RandomService(name string) node.Service {
- n := s.randomUpNode()
+ n := s.RandomUpNode()
if n == nil {
return nil
}
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 4d55c6ee3..491dc9fd5 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -18,135 +18,70 @@ package stream
import (
"context"
- "encoding/binary"
+ crand "crypto/rand"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
+ "math/rand"
"os"
+ "strings"
"sync/atomic"
"testing"
"time"
- "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
- "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/ethereum/go-ethereum/swarm/storage/mock"
- "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
+ mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
colorable "github.com/mattn/go-colorable"
)
var (
- deliveries map[discover.NodeID]*Delivery
- stores map[discover.NodeID]storage.ChunkStore
- toAddr func(discover.NodeID) *network.BzzAddr
- peerCount func(discover.NodeID) int
- adapter = flag.String("adapter", "sim", "type of simulation: sim|exec|docker")
loglevel = flag.Int("loglevel", 2, "verbosity of logs")
nodes = flag.Int("nodes", 0, "number of nodes")
chunks = flag.Int("chunks", 0, "number of chunks")
useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)")
-)
+ longrunning = flag.Bool("longrunning", false, "do run long-running tests")
-var (
- defaultSkipCheck bool
- waitPeerErrC chan error
- chunkSize = 4096
- registries map[discover.NodeID]*TestRegistry
- createStoreFunc func(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error)
- getRetrieveFunc = defaultRetrieveFunc
- subscriptionCount = 0
- globalStore mock.GlobalStorer
- globalStoreDir string
-)
+ bucketKeyDB = simulation.BucketKey("db")
+ bucketKeyStore = simulation.BucketKey("store")
+ bucketKeyFileStore = simulation.BucketKey("filestore")
+ bucketKeyNetStore = simulation.BucketKey("netstore")
+ bucketKeyDelivery = simulation.BucketKey("delivery")
+ bucketKeyRegistry = simulation.BucketKey("registry")
-var services = adapters.Services{
- "streamer": NewStreamerService,
- "intervalsStreamer": newIntervalsStreamerService,
-}
+ chunkSize = 4096
+ pof = pot.DefaultPof(256)
+)
func init() {
flag.Parse()
- // register the Delivery service which will run as a devp2p
- // protocol when using the exec adapter
- adapters.RegisterServices(services)
+ rand.Seed(time.Now().UnixNano())
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
-func createGlobalStore() {
- var err error
- globalStoreDir, err = ioutil.TempDir("", "global.store")
+func createGlobalStore() (string, *mockdb.GlobalStore, error) {
+ var globalStore *mockdb.GlobalStore
+ globalStoreDir, err := ioutil.TempDir("", "global.store")
if err != nil {
log.Error("Error initiating global store temp directory!", "err", err)
- return
+ return "", nil, err
}
- globalStore, err = db.NewGlobalStore(globalStoreDir)
+ globalStore, err = mockdb.NewGlobalStore(globalStoreDir)
if err != nil {
log.Error("Error initiating global store!", "err", err)
+ return "", nil, err
}
-}
-
-// NewStreamerService
-func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
- var err error
- id := ctx.Config.ID
- addr := toAddr(id)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- stores[id], err = createStoreFunc(id, addr)
- if err != nil {
- return nil, err
- }
- store := stores[id].(*storage.LocalStore)
- db := storage.NewDBAPI(store)
- delivery := NewDelivery(kad, db)
- deliveries[id] = delivery
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: defaultSkipCheck,
- DoRetrieve: false,
- })
- RegisterSwarmSyncerServer(r, db)
- RegisterSwarmSyncerClient(r, db)
- go func() {
- waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
- }()
- fileStore := storage.NewFileStore(storage.NewNetStore(store, getRetrieveFunc(id)), storage.NewFileStoreParams())
- testRegistry := &TestRegistry{Registry: r, fileStore: fileStore}
- registries[id] = testRegistry
- return testRegistry, nil
-}
-
-func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
- return nil
-}
-
-func datadirsCleanup() {
- for _, id := range ids {
- os.RemoveAll(datadirs[id])
- }
- if globalStoreDir != "" {
- os.RemoveAll(globalStoreDir)
- }
-}
-
-//local stores need to be cleaned up after the sim is done
-func localStoreCleanup() {
- log.Info("Cleaning up...")
- for _, id := range ids {
- registries[id].Close()
- stores[id].Close()
- }
- log.Info("Local store cleanup done")
+ return globalStoreDir, globalStore, nil
}
func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
@@ -174,9 +109,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
db := storage.NewDBAPI(localStore)
delivery := NewDelivery(to, db)
- streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: defaultSkipCheck,
- })
+ streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
@@ -233,22 +166,6 @@ func (rrs *roundRobinStore) Close() {
}
}
-type TestRegistry struct {
- *Registry
- fileStore *storage.FileStore
-}
-
-func (r *TestRegistry) APIs() []rpc.API {
- a := r.Registry.APIs()
- a = append(a, rpc.API{
- Namespace: "stream",
- Version: "3.0",
- Service: r,
- Public: true,
- })
- return a
-}
-
func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
r, _ := fileStore.Retrieve(context.TODO(), hash)
buf := make([]byte, 1024)
@@ -265,185 +182,74 @@ func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
return total, nil
}
-func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) {
- return readAll(r.fileStore, hash[:])
-}
-
-func (r *TestRegistry) Start(server *p2p.Server) error {
- return r.Registry.Start(server)
-}
-
-func (r *TestRegistry) Stop() error {
- return r.Registry.Stop()
-}
-
-type TestExternalRegistry struct {
- *Registry
-}
-
-func (r *TestExternalRegistry) APIs() []rpc.API {
- a := r.Registry.APIs()
- a = append(a, rpc.API{
- Namespace: "stream",
- Version: "3.0",
- Service: r,
- Public: true,
- })
- return a
-}
-
-func (r *TestExternalRegistry) GetHashes(ctx context.Context, peerId discover.NodeID, s Stream) (*rpc.Subscription, error) {
- peer := r.getPeer(peerId)
-
- client, err := peer.getClient(ctx, s)
- if err != nil {
- return nil, err
- }
-
- c := client.Client.(*testExternalClient)
+func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) {
+ nodes := sim.UpNodeIDs()
+ nodeCnt := len(nodes)
+ log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
+ //array holding generated files
+ rfiles := make([]string, nodeCnt)
+ //array holding the root hashes of the files
+ rootAddrs := make([]storage.Address, nodeCnt)
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return nil, fmt.Errorf("Subscribe not supported")
- }
-
- sub := notifier.CreateSubscription()
-
- go func() {
- // if we begin sending event immediately some events
- // will probably be dropped since the subscription ID might not be send to
- // the client.
- // ref: rpc/subscription_test.go#L65
- time.Sleep(1 * time.Second)
- for {
- select {
- case h := <-c.hashes:
- <-c.enableNotificationsC // wait for notification subscription to complete
- if err := notifier.Notify(sub.ID, h); err != nil {
- log.Warn(fmt.Sprintf("rpc sub notifier notify stream %s: %v", s, err))
- }
- case err := <-sub.Err():
- if err != nil {
- log.Warn(fmt.Sprintf("caught subscription error in stream %s: %v", s, err))
- }
- case <-notifier.Closed():
- log.Trace(fmt.Sprintf("rpc sub notifier closed"))
- return
- }
+ var err error
+ //for every node, generate a file and upload
+ for i, id := range nodes {
+ item, ok := sim.NodeItem(id, bucketKeyFileStore)
+ if !ok {
+ return nil, nil, fmt.Errorf("Error accessing localstore")
}
- }()
-
- return sub, nil
-}
-
-func (r *TestExternalRegistry) EnableNotifications(peerId discover.NodeID, s Stream) error {
- peer := r.getPeer(peerId)
-
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- client, err := peer.getClient(ctx, s)
- if err != nil {
- return err
- }
-
- close(client.Client.(*testExternalClient).enableNotificationsC)
-
- return nil
-}
-
-// TODO: merge functionalities of testExternalClient and testExternalServer
-// with testClient and testServer.
-
-type testExternalClient struct {
- hashes chan []byte
- db *storage.DBAPI
- enableNotificationsC chan struct{}
-}
-
-func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
- return &testExternalClient{
- hashes: make(chan []byte),
- db: db,
- enableNotificationsC: make(chan struct{}),
- }
-}
-
-func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
- if chunk.ReqC == nil {
- return nil
- }
- c.hashes <- hash
- return func() {
- chunk.WaitToStore()
+ fileStore := item.(*storage.FileStore)
+ //generate a file
+ rfiles[i], err = generateRandomFile()
+ if err != nil {
+ return nil, nil, err
+ }
+ //store it (upload it) on the FileStore
+ ctx := context.TODO()
+ rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
+ log.Debug("Uploaded random string file to node")
+ if err != nil {
+ return nil, nil, err
+ }
+ err = wait(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+ rootAddrs[i] = rk
}
+ return rootAddrs, rfiles, nil
}
-func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
- return nil
-}
-
-func (c *testExternalClient) Close() {}
-
-const testExternalServerBatchSize = 10
-
-type testExternalServer struct {
- t string
- keyFunc func(key []byte, index uint64)
- sessionAt uint64
- maxKeys uint64
- streamer *TestExternalRegistry
-}
-
-func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
- if keyFunc == nil {
- keyFunc = binary.BigEndian.PutUint64
- }
- return &testExternalServer{
- t: t,
- keyFunc: keyFunc,
- sessionAt: sessionAt,
- maxKeys: maxKeys,
+//generate a random file (string)
+func generateRandomFile() (string, error) {
+ //generate a random file size between minFileSize and maxFileSize
+ fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
+ log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
+ b := make([]byte, fileSize*1024)
+ _, err := crand.Read(b)
+ if err != nil {
+ log.Error("Error generating random file.", "err", err)
+ return "", err
}
+ return string(b), nil
}
-func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- if from == 0 && to == 0 {
- from = s.sessionAt
- to = s.sessionAt + testExternalServerBatchSize
- }
- if to-from > testExternalServerBatchSize {
- to = from + testExternalServerBatchSize - 1
- }
- if from >= s.maxKeys && to > s.maxKeys {
- return nil, 0, 0, nil, io.EOF
- }
- if to > s.maxKeys {
- to = s.maxKeys
+//create a local store for the given node
+func createTestLocalStorageForID(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, string, error) {
+ var datadir string
+ var err error
+ datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
+ if err != nil {
+ return nil, "", err
}
- b := make([]byte, HashSize*(to-from+1))
- for i := from; i <= to; i++ {
- s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
+ var store storage.ChunkStore
+ params := storage.NewDefaultLocalStoreParams()
+ params.ChunkDbPath = datadir
+ params.BaseKey = addr.Over()
+ store, err = storage.NewTestLocalStoreForAddr(params)
+ if err != nil {
+ os.RemoveAll(datadir)
+ return nil, "", err
}
- return b, from, to, nil, nil
-}
-
-func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
- return make([]byte, 4096), nil
-}
-
-func (s *testExternalServer) Close() {}
-
-// Sets the global value defaultSkipCheck.
-// It should be used in test function defer to reset the global value
-// to the original value.
-//
-// defer setDefaultSkipCheck(defaultSkipCheck)
-// defaultSkipCheck = skipCheck
-//
-// This works as defer function arguments evaluations are evaluated as ususal,
-// but only the function body invocation is deferred.
-func setDefaultSkipCheck(skipCheck bool) {
- defaultSkipCheck = skipCheck
+ return store, datadir, nil
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index f3da893a2..ae007e5b0 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -22,18 +22,19 @@ import (
crand "crypto/rand"
"fmt"
"io"
+ "os"
"sync"
"testing"
"time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
- "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
- streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@@ -308,159 +309,164 @@ func TestDeliveryFromNodes(t *testing.T) {
}
func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) {
- defaultSkipCheck = skipCheck
- toAddr = network.NewAddrFromNodeID
- createStoreFunc = createTestLocalStorageFromSim
- conf := &streamTesting.RunConfig{
- Adapter: *adapter,
- NodeCount: nodes,
- ConnLevel: conns,
- ToAddr: toAddr,
- Services: services,
- EnableMsgEvents: false,
- }
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- sim, teardown, err := streamTesting.NewSimulation(conf)
- var rpcSubscriptionsWg sync.WaitGroup
- defer func() {
- rpcSubscriptionsWg.Wait()
- teardown()
- }()
- if err != nil {
- t.Fatal(err.Error())
- }
- stores = make(map[discover.NodeID]storage.ChunkStore)
- for i, id := range sim.IDs {
- stores[id] = sim.Stores[i]
- }
- registries = make(map[discover.NodeID]*TestRegistry)
- deliveries = make(map[discover.NodeID]*Delivery)
- peerCount = func(id discover.NodeID) int {
- if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
- return 1
- }
- return 2
- }
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ store.Close()
+ }
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
- // here we distribute chunks of a random file into Stores of nodes 1 to nodes
- rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
- size := chunkCount * chunkSize
- ctx := context.TODO()
- fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
- // wait until all chunks stored
- if err != nil {
- t.Fatal(err.Error())
- }
- err = wait(ctx)
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ })
+ bucket.Store(bucketKeyRegistry, r)
+
+ retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
+ return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
+ }
+ netStore := storage.NewNetStore(localStore, retrieveFunc)
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ return r, cleanup, nil
+
+ },
+ })
+ defer sim.Close()
+
+ log.Info("Adding nodes to simulation")
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
- t.Fatal(err.Error())
+ t.Fatal(err)
}
- errc := make(chan error, 1)
- waitPeerErrC = make(chan error)
- quitC := make(chan struct{})
- defer close(quitC)
-
- action := func(ctx context.Context) error {
- // each node Subscribes to each other's swarmChunkServerStreamName
- // need to wait till an aynchronous process registers the peers in streamer.peers
- // that is used by Subscribe
- // using a global err channel to share betweem action and node service
+
+ log.Info("Starting simulation")
+ ctx := context.Background()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ //determine the pivot node to be the first node of the simulation
+ sim.SetPivotNode(nodeIDs[0])
+ //distribute chunks of a random file into Stores of nodes 1 to nodes
+ //we will do this by creating a file store with an underlying round-robin store:
+ //the file store will create a hash for the uploaded file, but every chunk will be
+ //distributed to different nodes via round-robin scheduling
+ log.Debug("Writing file to round-robin file store")
+ //to do this, we create an array for chunkstores (length minus one, the pivot node)
+ stores := make([]storage.ChunkStore, len(nodeIDs)-1)
+ //we then need to get all stores from the sim....
+ lStores := sim.NodesItems(bucketKeyStore)
i := 0
- for err := range waitPeerErrC {
- if err != nil {
- return fmt.Errorf("error waiting for peers: %s", err)
+ //...iterate the buckets...
+ for id, bucketVal := range lStores {
+ //...and remove the one which is the pivot node
+ if id == *sim.PivotNodeID() {
+ continue
}
+ //the other ones are added to the array...
+ stores[i] = bucketVal.(storage.ChunkStore)
i++
- if i == nodes {
- break
- }
+ }
+ //...which then gets passed to the round-robin file store
+ roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
+ //now we can actually upload a (random) file to the round-robin store
+ size := chunkCount * chunkSize
+ log.Debug("Storing data to file store")
+ fileHash, wait, err := roundRobinFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
+ // wait until all chunks stored
+ if err != nil {
+ return err
+ }
+ err = wait(ctx)
+ if err != nil {
+ return err
}
- // each node subscribes to the upstream swarm chunk server stream
- // which responds to chunk retrieve requests all but the last node in the chain does not
- for j := 0; j < nodes-1; j++ {
- id := sim.IDs[j]
- err := sim.CallClient(id, func(client *rpc.Client) error {
- doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
- if err != nil {
- return err
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-doneC
- rpcSubscriptionsWg.Done()
- }()
- ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
- defer cancel()
- sid := sim.IDs[j+1]
- return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
- })
+ //each of the nodes (except pivot node) subscribes to the stream of the next node
+ for j, node := range nodeIDs[0 : nodes-1] {
+ sid := nodeIDs[j+1]
+ item, ok := sim.NodeItem(node, bucketKeyRegistry)
+ if !ok {
+ return fmt.Errorf("No registry")
+ }
+ registry := item.(*Registry)
+ err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
if err != nil {
return err
}
}
- // create a retriever FileStore for the pivot node
- delivery := deliveries[sim.IDs[0]]
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+ //get the pivot node's filestore
+ item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
+ }
+ pivotFileStore := item.(*storage.FileStore)
+ log.Debug("Starting retrieval routine")
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
- n, err := readAll(fileStore, fileHash)
+ n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
if err != nil {
- errc <- fmt.Errorf("requesting chunks action error: %v", err)
+ t.Fatalf("requesting chunks action error: %v", err)
}
}()
- return nil
- }
- check := func(ctx context.Context, id discover.NodeID) (bool, error) {
- select {
- case err := <-errc:
- return false, err
- case <-ctx.Done():
- return false, ctx.Err()
- default:
+
+ log.Debug("Waiting for kademlia")
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
+
+ log.Debug("Watching for disconnections")
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ if d.Error != nil {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal(d.Error)
+ }
+ }
+ }()
+
+ //finally check that the pivot node gets all chunks via the root hash
+ log.Debug("Check retrieval")
+ success := true
var total int64
- err := sim.CallClient(id, func(client *rpc.Client) error {
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash))
- })
+ total, err = readAll(pivotFileStore, fileHash)
+ if err != nil {
+ return err
+ }
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
- return false, nil
+ success = false
}
- return true, nil
- }
- conf.Step = &simulations.Step{
- Action: action,
- Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
- // we are only testing the pivot node (net.Nodes[0])
- Expect: &simulations.Expectation{
- Nodes: sim.IDs[0:1],
- Check: check,
- },
- }
- startedAt := time.Now()
- timeout := 300 * time.Second
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- result, err := sim.Run(ctx, conf)
- finishedAt := time.Now()
- if err != nil {
- t.Fatalf("Setting up simulation failed: %v", err)
- }
+ if !success {
+ return fmt.Errorf("Test failed, chunks not available on all nodes")
+ }
+ log.Debug("Test terminated successfully")
+ return nil
+ })
if result.Error != nil {
- t.Fatalf("Simulation failed: %s", result.Error)
+ t.Fatal(result.Error)
}
- streamTesting.CheckResult(t, result, startedAt, finishedAt)
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
@@ -490,218 +496,146 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
}
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
- defaultSkipCheck = skipCheck
- toAddr = network.NewAddrFromNodeID
- createStoreFunc = createTestLocalStorageFromSim
- registries = make(map[discover.NodeID]*TestRegistry)
-
- timeout := 300 * time.Second
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
-
- conf := &streamTesting.RunConfig{
- Adapter: *adapter,
- NodeCount: nodes,
- ConnLevel: conns,
- ToAddr: toAddr,
- Services: services,
- EnableMsgEvents: false,
- }
- sim, teardown, err := streamTesting.NewSimulation(conf)
- var rpcSubscriptionsWg sync.WaitGroup
- defer func() {
- rpcSubscriptionsWg.Wait()
- teardown()
- }()
- if err != nil {
- b.Fatal(err.Error())
- }
-
- stores = make(map[discover.NodeID]storage.ChunkStore)
- deliveries = make(map[discover.NodeID]*Delivery)
- for i, id := range sim.IDs {
- stores[id] = sim.Stores[i]
- }
- peerCount = func(id discover.NodeID) int {
- if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
- return 1
- }
- return 2
- }
- // wait channel for all nodes all peer connections to set up
- waitPeerErrC = make(chan error)
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- // create a FileStore for the last node in the chain which we are gonna write to
- remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams())
-
- // channel to signal simulation initialisation with action call complete
- // or node disconnections
- disconnectC := make(chan error)
- quitC := make(chan struct{})
-
- initC := make(chan error)
-
- action := func(ctx context.Context) error {
- // each node Subscribes to each other's swarmChunkServerStreamName
- // need to wait till an aynchronous process registers the peers in streamer.peers
- // that is used by Subscribe
- // waitPeerErrC using a global err channel to share betweem action and node service
- i := 0
- for err := range waitPeerErrC {
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
- return fmt.Errorf("error waiting for peers: %s", err)
+ return nil, nil, err
}
- i++
- if i == nodes {
- break
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ store.Close()
}
- }
- var err error
- // each node except the last one subscribes to the upstream swarm chunk server stream
- // which responds to chunk retrieve requests
- for j := 0; j < nodes-1; j++ {
- id := sim.IDs[j]
- err = sim.CallClient(id, func(client *rpc.Client) error {
- doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
- if err != nil {
- return err
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-doneC
- rpcSubscriptionsWg.Done()
- }()
- ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
- defer cancel()
- sid := sim.IDs[j+1] // the upstream peer's id
- return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
+
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ DoSync: true,
+ SyncUpdateDelay: 0,
})
- if err != nil {
- break
+
+ retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
+ return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
- }
- initC <- err
- return nil
- }
+ netStore := storage.NewNetStore(localStore, retrieveFunc)
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+ bucket.Store(bucketKeyFileStore, fileStore)
- // the check function is only triggered when the benchmark finishes
- trigger := make(chan discover.NodeID)
- check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) {
- return true, nil
- }
+ return r, cleanup, nil
- conf.Step = &simulations.Step{
- Action: action,
- Trigger: trigger,
- // we are only testing the pivot node (net.Nodes[0])
- Expect: &simulations.Expectation{
- Nodes: sim.IDs[0:1],
- Check: check,
},
- }
-
- // run the simulation in the background
- errc := make(chan error)
- go func() {
- _, err := sim.Run(ctx, conf)
- close(quitC)
- errc <- err
- }()
+ })
+ defer sim.Close()
- // wait for simulation action to complete stream subscriptions
- err = <-initC
+ log.Info("Initializing test config")
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
- b.Fatalf("simulation failed to initialise. expected no error. got %v", err)
+ b.Fatal(err)
}
- // create a retriever FileStore for the pivot node
- // by now deliveries are set for each node by the streamer service
- delivery := deliveries[sim.IDs[0]]
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
-
- // benchmark loop
- b.ResetTimer()
- b.StopTimer()
-Loop:
- for i := 0; i < b.N; i++ {
- // uploading chunkCount random chunks to the last node
- hashes := make([]storage.Address, chunkCount)
- for i := 0; i < chunkCount; i++ {
- // create actual size real chunks
- ctx := context.TODO()
- hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
- if err != nil {
- b.Fatalf("expected no error. got %v", err)
- }
- // wait until all chunks stored
- err = wait(ctx)
- if err != nil {
- b.Fatalf("expected no error. got %v", err)
- }
- // collect the hashes
- hashes[i] = hash
+ ctx := context.Background()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ node := nodeIDs[len(nodeIDs)-1]
+
+ item, ok := sim.NodeItem(node, bucketKeyFileStore)
+ if !ok {
+ b.Fatal("No filestore")
}
- // now benchmark the actual retrieval
- // netstore.Get is called for each hash in a go routine and errors are collected
- b.StartTimer()
- errs := make(chan error)
- for _, hash := range hashes {
- go func(h storage.Address) {
- _, err := netStore.Get(ctx, h)
- log.Warn("test check netstore get", "hash", h, "err", err)
- errs <- err
- }(hash)
+ remoteFileStore := item.(*storage.FileStore)
+
+ pivotNode := nodeIDs[0]
+ item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
+ if !ok {
+ b.Fatal("No filestore")
}
- // count and report retrieval errors
- // if there are misses then chunk timeout is too low for the distance and volume (?)
- var total, misses int
- for err := range errs {
- if err != nil {
- log.Warn(err.Error())
- misses++
- }
- total++
- if total == chunkCount {
- break
- }
+ netStore := item.(*storage.NetStore)
+
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ if d.Error != nil {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ b.Fatal(d.Error)
+ }
+ }
+ }()
+ // benchmark loop
+ b.ResetTimer()
b.StopTimer()
+ Loop:
+ for i := 0; i < b.N; i++ {
+ // uploading chunkCount random chunks to the last node
+ hashes := make([]storage.Address, chunkCount)
+ for i := 0; i < chunkCount; i++ {
+ // create actual size real chunks
+ ctx := context.TODO()
+ hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
+ if err != nil {
+ b.Fatalf("expected no error. got %v", err)
+ }
+ // wait until all chunks stored
+ err = wait(ctx)
+ if err != nil {
+ b.Fatalf("expected no error. got %v", err)
+ }
+ // collect the hashes
+ hashes[i] = hash
+ }
+ // now benchmark the actual retrieval
+ // netstore.Get is called for each hash in a go routine and errors are collected
+ b.StartTimer()
+ errs := make(chan error)
+ for _, hash := range hashes {
+ go func(h storage.Address) {
+ _, err := netStore.Get(ctx, h)
+ log.Warn("test check netstore get", "hash", h, "err", err)
+ errs <- err
+ }(hash)
+ }
+ // count and report retrieval errors
+ // if there are misses then chunk timeout is too low for the distance and volume (?)
+ var total, misses int
+ for err := range errs {
+ if err != nil {
+ log.Warn(err.Error())
+ misses++
+ }
+ total++
+ if total == chunkCount {
+ break
+ }
+ }
+ b.StopTimer()
- select {
- case err = <-disconnectC:
- if err != nil {
+ if misses > 0 {
+ err = fmt.Errorf("%v chunk not found out of %v", misses, total)
break Loop
}
- default:
- }
-
- if misses > 0 {
- err = fmt.Errorf("%v chunk not found out of %v", misses, total)
- break Loop
}
- }
-
- select {
- case <-quitC:
- case trigger <- sim.IDs[0]:
- }
- if err == nil {
- err = <-errc
- } else {
- if e := <-errc; e != nil {
- b.Errorf("sim.Run function error: %v", e)
+ if err != nil {
+ b.Fatal(err)
}
+ return nil
+ })
+ if result.Error != nil {
+ b.Fatal(result.Error)
}
- // benchmark over, trigger the check function to conclude the simulation
- if err != nil {
- b.Fatalf("expected no error. got %v", err)
- }
-}
-
-func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
- return stores[id], nil
}
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index d996cdc7e..f4294134b 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -22,52 +22,22 @@ import (
"encoding/binary"
"fmt"
"io"
+ "os"
"sync"
"testing"
"time"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
- streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
-var (
- externalStreamName = "externalStream"
- externalStreamSessionAt uint64 = 50
- externalStreamMaxKeys uint64 = 100
-)
-
-func newIntervalsStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
- id := ctx.Config.ID
- addr := toAddr(id)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- store := stores[id].(*storage.LocalStore)
- db := storage.NewDBAPI(store)
- delivery := NewDelivery(kad, db)
- deliveries[id] = delivery
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: defaultSkipCheck,
- })
-
- r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return newTestExternalClient(db), nil
- })
- r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
- return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
- })
-
- go func() {
- waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id))
- }()
- return &TestExternalRegistry{r}, nil
-}
-
func TestIntervals(t *testing.T) {
testIntervals(t, true, nil, false)
testIntervals(t, false, NewRange(9, 26), false)
@@ -81,237 +51,337 @@ func TestIntervals(t *testing.T) {
func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
nodes := 2
chunkCount := dataChunkCount
+ externalStreamName := "externalStream"
+ externalStreamSessionAt := uint64(50)
+ externalStreamMaxKeys := uint64(100)
- defer setDefaultSkipCheck(defaultSkipCheck)
- defaultSkipCheck = skipCheck
-
- toAddr = network.NewAddrFromNodeID
- conf := &streamTesting.RunConfig{
- Adapter: *adapter,
- NodeCount: nodes,
- ConnLevel: 1,
- ToAddr: toAddr,
- Services: services,
- DefaultService: "intervalsStreamer",
- }
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- sim, teardown, err := streamTesting.NewSimulation(conf)
- var rpcSubscriptionsWg sync.WaitGroup
- defer func() {
- rpcSubscriptionsWg.Wait()
- teardown()
- }()
- if err != nil {
- t.Fatal(err)
- }
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ store.Close()
+ os.RemoveAll(datadir)
+ }
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
- stores = make(map[discover.NodeID]storage.ChunkStore)
- deliveries = make(map[discover.NodeID]*Delivery)
- for i, id := range sim.IDs {
- stores[id] = sim.Stores[i]
- }
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ })
+ bucket.Store(bucketKeyRegistry, r)
- peerCount = func(id discover.NodeID) int {
- return 1
- }
+ r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
+ return newTestExternalClient(db), nil
+ })
+ r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
+ return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
+ })
- fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams())
- size := chunkCount * chunkSize
- ctx := context.TODO()
- _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
- if err != nil {
- t.Fatal(err)
- }
- err = wait(ctx)
+ fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ return r, cleanup, nil
+
+ },
+ })
+ defer sim.Close()
+
+ log.Info("Adding nodes to simulation")
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
- errc := make(chan error, 1)
- waitPeerErrC = make(chan error)
- quitC := make(chan struct{})
- defer close(quitC)
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
+ defer cancel()
- action := func(ctx context.Context) error {
- i := 0
- for err := range waitPeerErrC {
- if err != nil {
- return fmt.Errorf("error waiting for peers: %s", err)
- }
- i++
- if i == nodes {
- break
- }
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ storer := nodeIDs[0]
+ checker := nodeIDs[1]
+
+ item, ok := sim.NodeItem(storer, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
}
+ fileStore := item.(*storage.FileStore)
- id := sim.IDs[1]
+ size := chunkCount * chunkSize
+ _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
+ if err != nil {
+ log.Error("Store error: %v", "err", err)
+ t.Fatal(err)
+ }
+ err = wait(ctx)
+ if err != nil {
+ log.Error("Wait error: %v", "err", err)
+ t.Fatal(err)
+ }
- err := sim.CallClient(id, func(client *rpc.Client) error {
+ item, ok = sim.NodeItem(checker, bucketKeyRegistry)
+ if !ok {
+ return fmt.Errorf("No registry")
+ }
+ registry := item.(*Registry)
- sid := sim.IDs[0]
+ liveErrC := make(chan error)
+ historyErrC := make(chan error)
- doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
- if err != nil {
- return err
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ log.Error("WaitKademlia error: %v", "err", err)
+ return err
+ }
+
+ log.Debug("Watching for disconnections")
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ if d.Error != nil {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal(d.Error)
+ }
}
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-doneC
- rpcSubscriptionsWg.Done()
+ }()
+
+ go func() {
+ if !live {
+ close(liveErrC)
+ return
+ }
+
+ var err error
+ defer func() {
+ liveErrC <- err
}()
- ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
- defer cancel()
- err = client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(externalStreamName, "", live), history, Top)
+ // live stream
+ var liveHashesChan chan []byte
+ liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
- return err
+ log.Error("Subscription error: %v", "err", err)
+ return
}
+ i := externalStreamSessionAt
- liveErrC := make(chan error)
- historyErrC := make(chan error)
+ // we have subscribed, enable notifications
+ err = enableNotifications(registry, storer, NewStream(externalStreamName, "", true))
+ if err != nil {
+ return
+ }
- go func() {
- if !live {
- close(liveErrC)
+ for {
+ select {
+ case hash := <-liveHashesChan:
+ h := binary.BigEndian.Uint64(hash)
+ if h != i {
+ err = fmt.Errorf("expected live hash %d, got %d", i, h)
+ return
+ }
+ i++
+ if i > externalStreamMaxKeys {
+ return
+ }
+ case <-ctx.Done():
return
}
+ }
+ }()
- var err error
- defer func() {
- liveErrC <- err
- }()
+ go func() {
+ if live && history == nil {
+ close(historyErrC)
+ return
+ }
- // live stream
- liveHashesChan := make(chan []byte)
- liveSubscription, err := client.Subscribe(ctx, "stream", liveHashesChan, "getHashes", sid, NewStream(externalStreamName, "", true))
- if err != nil {
- return
- }
- defer liveSubscription.Unsubscribe()
+ var err error
+ defer func() {
+ historyErrC <- err
+ }()
- i := externalStreamSessionAt
+ // history stream
+ var historyHashesChan chan []byte
+ historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
+ if err != nil {
+ return
+ }
- // we have subscribed, enable notifications
- err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", true))
- if err != nil {
- return
+ var i uint64
+ historyTo := externalStreamMaxKeys
+ if history != nil {
+ i = history.From
+ if history.To != 0 {
+ historyTo = history.To
}
+ }
- for {
- select {
- case hash := <-liveHashesChan:
- h := binary.BigEndian.Uint64(hash)
- if h != i {
- err = fmt.Errorf("expected live hash %d, got %d", i, h)
- return
- }
- i++
- if i > externalStreamMaxKeys {
- return
- }
- case err = <-liveSubscription.Err():
+ // we have subscribed, enable notifications
+ err = enableNotifications(registry, storer, NewStream(externalStreamName, "", false))
+ if err != nil {
+ return
+ }
+
+ for {
+ select {
+ case hash := <-historyHashesChan:
+ h := binary.BigEndian.Uint64(hash)
+ if h != i {
+ err = fmt.Errorf("expected history hash %d, got %d", i, h)
return
- case <-ctx.Done():
+ }
+ i++
+ if i > historyTo {
return
}
- }
- }()
-
- go func() {
- if live && history == nil {
- close(historyErrC)
+ case <-ctx.Done():
return
}
+ }
+ }()
- var err error
- defer func() {
- historyErrC <- err
- }()
+ err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
+ if err != nil {
+ return err
+ }
+ if err := <-liveErrC; err != nil {
+ return err
+ }
+ if err := <-historyErrC; err != nil {
+ return err
+ }
- // history stream
- historyHashesChan := make(chan []byte)
- historySubscription, err := client.Subscribe(ctx, "stream", historyHashesChan, "getHashes", sid, NewStream(externalStreamName, "", false))
- if err != nil {
- return
- }
- defer historySubscription.Unsubscribe()
-
- var i uint64
- historyTo := externalStreamMaxKeys
- if history != nil {
- i = history.From
- if history.To != 0 {
- historyTo = history.To
- }
- }
+ return nil
+ })
- // we have subscribed, enable notifications
- err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", false))
- if err != nil {
- return
- }
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
- for {
- select {
- case hash := <-historyHashesChan:
- h := binary.BigEndian.Uint64(hash)
- if h != i {
- err = fmt.Errorf("expected history hash %d, got %d", i, h)
- return
- }
- i++
- if i > historyTo {
- return
- }
- case err = <-historySubscription.Err():
- return
- case <-ctx.Done():
- return
- }
- }
- }()
+func getHashes(ctx context.Context, r *Registry, peerID discover.NodeID, s Stream) (chan []byte, error) {
+ peer := r.getPeer(peerID)
- if err := <-liveErrC; err != nil {
- return err
- }
- if err := <-historyErrC; err != nil {
- return err
- }
+ client, err := peer.getClient(ctx, s)
+ if err != nil {
+ return nil, err
+ }
+
+ c := client.Client.(*testExternalClient)
+
+ return c.hashes, nil
+}
+
+func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
+ peer := r.getPeer(peerID)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
- return nil
- })
+ client, err := peer.getClient(ctx, s)
+ if err != nil {
return err
}
- check := func(ctx context.Context, id discover.NodeID) (bool, error) {
- select {
- case err := <-errc:
- return false, err
- case <-ctx.Done():
- return false, ctx.Err()
- default:
+
+ close(client.Client.(*testExternalClient).enableNotificationsC)
+
+ return nil
+}
+
+type testExternalClient struct {
+ hashes chan []byte
+ db *storage.DBAPI
+ enableNotificationsC chan struct{}
+}
+
+func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
+ return &testExternalClient{
+ hashes: make(chan []byte),
+ db: db,
+ enableNotificationsC: make(chan struct{}),
+ }
+}
+
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
+ chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
+ if chunk.ReqC == nil {
+ return nil
+ }
+ c.hashes <- hash
+ //NOTE: This was failing on go1.9.x with a deadlock.
+ //Sometimes this function would just block
+ //It is commented now, but it may be well worth after the chunk refactor
+ //to re-enable this and see if the problem has been addressed
+ /*
+ return func() {
+ return chunk.WaitToStore()
}
- return true, nil
+ */
+ return nil
+}
+
+func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
+ return nil
+}
+
+func (c *testExternalClient) Close() {}
+
+const testExternalServerBatchSize = 10
+
+type testExternalServer struct {
+ t string
+ keyFunc func(key []byte, index uint64)
+ sessionAt uint64
+ maxKeys uint64
+}
+
+func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
+ if keyFunc == nil {
+ keyFunc = binary.BigEndian.PutUint64
}
+ return &testExternalServer{
+ t: t,
+ keyFunc: keyFunc,
+ sessionAt: sessionAt,
+ maxKeys: maxKeys,
+ }
+}
- conf.Step = &simulations.Step{
- Action: action,
- Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]),
- Expect: &simulations.Expectation{
- Nodes: sim.IDs[1:1],
- Check: check,
- },
+func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
+ if from == 0 && to == 0 {
+ from = s.sessionAt
+ to = s.sessionAt + testExternalServerBatchSize
}
- startedAt := time.Now()
- timeout := 300 * time.Second
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- result, err := sim.Run(ctx, conf)
- finishedAt := time.Now()
- if err != nil {
- t.Fatalf("Setting up simulation failed: %v", err)
+ if to-from > testExternalServerBatchSize {
+ to = from + testExternalServerBatchSize - 1
}
- if result.Error != nil {
- t.Fatalf("Simulation failed: %s", result.Error)
+ if from >= s.maxKeys && to > s.maxKeys {
+ return nil, 0, 0, nil, io.EOF
+ }
+ if to > s.maxKeys {
+ to = s.maxKeys
+ }
+ b := make([]byte, HashSize*(to-from+1))
+ for i := from; i <= to; i++ {
+ s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
- streamTesting.CheckResult(t, result, startedAt, finishedAt)
+ return b, from, to, nil, nil
}
+
+func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
+ return make([]byte, 4096), nil
+}
+
+func (s *testExternalServer) Close() {}
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 9961a0bc7..4ff947b21 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -17,20 +17,19 @@ package stream
import (
"context"
- crand "crypto/rand"
"fmt"
- "math/rand"
- "strings"
+ "os"
"sync"
"testing"
"time"
- "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
- streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
@@ -40,40 +39,6 @@ const (
maxFileSize = 40
)
-func initRetrievalTest() {
- //global func to get overlay address from discover ID
- toAddr = func(id discover.NodeID) *network.BzzAddr {
- addr := network.NewAddrFromNodeID(id)
- return addr
- }
- //global func to create local store
- createStoreFunc = createTestLocalStorageForId
- //local stores
- stores = make(map[discover.NodeID]storage.ChunkStore)
- //data directories for each node and store
- datadirs = make(map[discover.NodeID]string)
- //deliveries for each node
- deliveries = make(map[discover.NodeID]*Delivery)
- //global retrieve func
- getRetrieveFunc = func(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
- return func(ctx context.Context, chunk *storage.Chunk) error {
- skipCheck := true
- return deliveries[id].RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- }
- //registries, map of discover.NodeID to its streamer
- registries = make(map[discover.NodeID]*TestRegistry)
- //not needed for this test but required from common_test for NewStreamService
- waitPeerErrC = make(chan error)
- //also not needed for this test but required for NewStreamService
- peerCount = func(id discover.NodeID) int {
- if ids[0] == id || ids[len(ids)-1] == id {
- return 1
- }
- return 2
- }
-}
-
//This test is a retrieval test for nodes.
//A configurable number of nodes can be
//provided to the test.
@@ -81,7 +46,10 @@ func initRetrievalTest() {
//Number of nodes can be provided via commandline too.
func TestFileRetrieval(t *testing.T) {
if *nodes != 0 {
- fileRetrievalTest(t, *nodes)
+ err := runFileRetrievalTest(*nodes)
+ if err != nil {
+ t.Fatal(err)
+ }
} else {
nodeCnt := []int{16}
//if the `longrunning` flag has been provided
@@ -90,7 +58,10 @@ func TestFileRetrieval(t *testing.T) {
nodeCnt = append(nodeCnt, 32, 64, 128)
}
for _, n := range nodeCnt {
- fileRetrievalTest(t, n)
+ err := runFileRetrievalTest(n)
+ if err != nil {
+ t.Fatal(err)
+ }
}
}
}
@@ -105,7 +76,10 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
- retrievalTest(t, *chunks, *nodes)
+ err := runRetrievalTest(*chunks, *nodes)
+ if err != nil {
+ t.Fatal(err)
+ }
} else {
var nodeCnt []int
var chnkCnt []int
@@ -121,76 +95,17 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
- retrievalTest(t, c, n)
+ err := runRetrievalTest(c, n)
+ if err != nil {
+ t.Fatal(err)
+ }
}
}
}
}
-//Every test runs 3 times, a live, a history, and a live AND history
-func fileRetrievalTest(t *testing.T, nodeCount int) {
- //test live and NO history
- log.Info("Testing live and no history", "nodeCount", nodeCount)
- live = true
- history = false
- err := runFileRetrievalTest(nodeCount)
- if err != nil {
- t.Fatal(err)
- }
- //test history only
- log.Info("Testing history only", "nodeCount", nodeCount)
- live = false
- history = true
- err = runFileRetrievalTest(nodeCount)
- if err != nil {
- t.Fatal(err)
- }
- //finally test live and history
- log.Info("Testing live and history", "nodeCount", nodeCount)
- live = true
- err = runFileRetrievalTest(nodeCount)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-//Every test runs 3 times, a live, a history, and a live AND history
-func retrievalTest(t *testing.T, chunkCount int, nodeCount int) {
- //test live and NO history
- log.Info("Testing live and no history", "chunkCount", chunkCount, "nodeCount", nodeCount)
- live = true
- history = false
- err := runRetrievalTest(chunkCount, nodeCount)
- if err != nil {
- t.Fatal(err)
- }
- //test history only
- log.Info("Testing history only", "chunkCount", chunkCount, "nodeCount", nodeCount)
- live = false
- history = true
- err = runRetrievalTest(chunkCount, nodeCount)
- if err != nil {
- t.Fatal(err)
- }
- //finally test live and history
- log.Info("Testing live and history", "chunkCount", chunkCount, "nodeCount", nodeCount)
- live = true
- err = runRetrievalTest(chunkCount, nodeCount)
- if err != nil {
- t.Fatal(err)
- }
-}
-
/*
-The upload is done by dependency to the global
-`live` and `history` variables;
-
-If `live` is set, first stream subscriptions are established,
-then files are uploaded to nodes.
-
-If `history` is enabled, first upload files, then build up subscriptions.
-
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. Nevertheless a health check runs in the
@@ -199,261 +114,129 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
func runFileRetrievalTest(nodeCount int) error {
- //for every run (live, history), int the variables
- initRetrievalTest()
- //the ids of the snapshot nodes, initiate only now as we need nodeCount
- ids = make([]discover.NodeID, nodeCount)
- //channel to check for disconnection errors
- disconnectC := make(chan error)
- //channel to close disconnection watcher routine
- quitC := make(chan struct{})
- //the test conf (using same as in `snapshot_sync_test`
- conf = &synctestConfig{}
- //map of overlay address to discover ID
- conf.addrToIdMap = make(map[string]discover.NodeID)
- //array where the generated chunk hashes will be stored
- conf.hashes = make([]storage.Address, 0)
- //load nodes from the snapshot file
- net, err := initNetWithSnapshot(nodeCount)
- if err != nil {
- return err
- }
- var rpcSubscriptionsWg sync.WaitGroup
- //do cleanup after test is terminated
- defer func() {
- //shutdown the snapshot network
- net.Shutdown()
- //after the test, clean up local stores initialized with createLocalStoreForId
- localStoreCleanup()
- //finally clear all data directories
- datadirsCleanup()
- }()
- //get the nodes of the network
- nodes := net.GetNodes()
- //iterate over all nodes...
- for c := 0; c < len(nodes); c++ {
- //create an array of discovery nodeIDS
- ids[c] = nodes[c].ID()
- a := network.ToOverlayAddr(ids[c].Bytes())
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- conf.addrToIdMap[string(a)] = ids[c]
- }
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- //needed for healthy call
- ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
-
- //an array for the random files
- var randomFiles []string
- //channel to signal when the upload has finished
- uploadFinished := make(chan struct{})
- //channel to trigger new node checks
- trigger := make(chan discover.NodeID)
- //simulation action
- action := func(ctx context.Context) error {
- //first run the health check on all nodes,
- //wait until nodes are all healthy
- ticker := time.NewTicker(200 * time.Millisecond)
- defer ticker.Stop()
- for range ticker.C {
- healthy := true
- for _, id := range ids {
- r := registries[id]
- //PeerPot for this node
- addr := common.Bytes2Hex(r.addr.OAddr)
- pp := ppmap[addr]
- //call Healthy RPC
- h := r.delivery.overlay.Healthy(pp)
- //print info
- log.Debug(r.delivery.overlay.String())
- log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
- if !h.GotNN || !h.Full {
- healthy = false
- break
- }
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
+ if err != nil {
+ return nil, nil, err
}
- if healthy {
- break
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ store.Close()
}
- }
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
- if history {
- log.Info("Uploading for history")
- //If testing only history, we upload the chunk(s) first
- conf.hashes, randomFiles, err = uploadFilesToNodes(nodes)
- if err != nil {
- return err
- }
- }
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ DoSync: true,
+ SyncUpdateDelay: 3 * time.Second,
+ })
- //variables needed to wait for all subscriptions established before uploading
- errc := make(chan error)
-
- //now setup and start event watching in order to know when we can upload
- ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
- defer watchCancel()
-
- log.Info("Setting up stream subscription")
- //We need two iterations, one to subscribe to the subscription events
- //(so we know when setup phase is finished), and one to
- //actually run the stream subscriptions. We can't do it in the same iteration,
- //because while the first nodes in the loop are setting up subscriptions,
- //the latter ones have not subscribed to listen to peer events yet,
- //and then we miss events.
-
- //first iteration: setup disconnection watcher and subscribe to peer events
- for j, id := range ids {
- log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
- client, err := net.GetNode(id).Client()
- if err != nil {
- return err
- }
- wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
- // doneC is nil, the error happened which is sent to errc channel, already
- if wsDoneC == nil {
- continue
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-wsDoneC
- rpcSubscriptionsWg.Done()
- }()
-
- //watch for peers disconnecting
- wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
- if err != nil {
- return err
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-wdDoneC
- rpcSubscriptionsWg.Done()
- }()
- }
+ fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ bucket.Store(bucketKeyFileStore, fileStore)
- //second iteration: start syncing and setup stream subscriptions
- for j, id := range ids {
- log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j))
- client, err := net.GetNode(id).Client()
- if err != nil {
- return err
- }
- //start syncing!
- var cnt int
- err = client.CallContext(ctx, &cnt, "stream_startSyncing")
- if err != nil {
- return err
- }
- //increment the number of subscriptions we need to wait for
- //by the count returned from startSyncing (SYNC subscriptions)
- subscriptionCount += cnt
- //now also add the number of RETRIEVAL_REQUEST subscriptions
- for snid := range registries[id].peers {
- subscriptionCount++
- err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top)
- if err != nil {
- return err
- }
- }
- }
+ return r, cleanup, nil
- //now wait until the number of expected subscriptions has been finished
- //`watchSubscriptionEvents` will write with a `nil` value to errc
- //every time a `SubscriptionMsg` has been received
- for err := range errc {
- if err != nil {
- return err
- }
- //`nil` received, decrement count
- subscriptionCount--
- //all subscriptions received
- if subscriptionCount == 0 {
- break
- }
- }
+ },
+ })
+ defer sim.Close()
- log.Info("Stream subscriptions successfully requested, action terminated")
+ log.Info("Initializing test config")
- if live {
- //upload generated files to nodes
- var hashes []storage.Address
- var rfiles []string
- hashes, rfiles, err = uploadFilesToNodes(nodes)
- if err != nil {
- return err
- }
- conf.hashes = append(conf.hashes, hashes...)
- randomFiles = append(randomFiles, rfiles...)
- //signal to the trigger loop that the upload has finished
- uploadFinished <- struct{}{}
- }
+ conf := &synctestConfig{}
+ //map of discover ID to indexes of chunks expected at that ID
+ conf.idToChunksMap = make(map[discover.NodeID][]int)
+ //map of overlay address to discover ID
+ conf.addrToIDMap = make(map[string]discover.NodeID)
+ //array where the generated chunk hashes will be stored
+ conf.hashes = make([]storage.Address, 0)
- return nil
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ if err != nil {
+ return err
}
- //check defines what will be checked during the test
- check := func(ctx context.Context, id discover.NodeID) (bool, error) {
-
- select {
- case <-ctx.Done():
- return false, ctx.Err()
- case e := <-disconnectC:
- log.Error(e.Error())
- return false, fmt.Errorf("Disconnect event detected, network unhealthy")
- default:
- }
- log.Trace(fmt.Sprintf("Checking node: %s", id))
- //if there are more than one chunk, test only succeeds if all expected chunks are found
- allSuccess := true
-
- //check on the node's FileStore (netstore)
- fileStore := registries[id].fileStore
- //check all chunks
- for i, hash := range conf.hashes {
- reader, _ := fileStore.Retrieve(context.TODO(), hash)
- //check that we can read the file size and that it corresponds to the generated file size
- if s, err := reader.Size(context.TODO(), nil); err != nil || s != int64(len(randomFiles[i])) {
- allSuccess = false
- log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
- } else {
- log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
- }
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancelSimRun()
+
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ for _, n := range nodeIDs {
+ //get the kademlia overlay address from this ID
+ a := network.ToOverlayAddr(n.Bytes())
+ //append it to the array of all overlay addresses
+ conf.addrs = append(conf.addrs, a)
+ //the proximity calculation is on overlay addr,
+ //the p2p/simulations check func triggers on discover.NodeID,
+ //so we need to know which overlay addr maps to which nodeID
+ conf.addrToIDMap[string(a)] = n
}
- return allSuccess, nil
- }
+ //an array for the random files
+ var randomFiles []string
+ //channel to signal when the upload has finished
+ //uploadFinished := make(chan struct{})
+ //channel to trigger new node checks
- //for each tick, run the checks on all nodes
- timingTicker := time.NewTicker(5 * time.Second)
- defer timingTicker.Stop()
- go func() {
- //for live upload, we should wait for uploads to have finished
- //before starting to trigger the checks, due to file size
- if live {
- <-uploadFinished
+ conf.hashes, randomFiles, err = uploadFilesToNodes(sim)
+ if err != nil {
+ return err
}
- for range timingTicker.C {
- for i := 0; i < len(ids); i++ {
- log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
- trigger <- ids[i]
- }
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
- }()
-
- log.Info("Starting simulation run...")
-
- timeout := MaxTimeout * time.Second
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- //run the simulation
- result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
- Action: action,
- Trigger: trigger,
- Expect: &simulations.Expectation{
- Nodes: ids,
- Check: check,
- },
+ // File retrieval check is repeated until all uploaded files are retrieved from all nodes
+ // or until the timeout is reached.
+ allSuccess := false
+ for !allSuccess {
+ for _, id := range nodeIDs {
+ //for each expected chunk, check if it is in the local store
+ localChunks := conf.idToChunksMap[id]
+ localSuccess := true
+ for _, ch := range localChunks {
+ //get the real chunk by the index in the index array
+ chunk := conf.hashes[ch]
+ log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
+ //check if the expected chunk is indeed in the localstore
+ var err error
+ //check on the node's FileStore (netstore)
+ item, ok := sim.NodeItem(id, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No registry")
+ }
+ fileStore := item.(*storage.FileStore)
+ //check all chunks
+ for i, hash := range conf.hashes {
+ reader, _ := fileStore.Retrieve(context.TODO(), hash)
+ //check that we can read the file size and that it corresponds to the generated file size
+ if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
+ allSuccess = false
+ log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
+ } else {
+ log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
+ }
+ }
+ if err != nil {
+ log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
+ localSuccess = false
+ } else {
+ log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ }
+ }
+ allSuccess = localSuccess
+ }
+ }
+ if !allSuccess {
+ return fmt.Errorf("Not all chunks succeeded!")
+ }
+ return nil
})
if result.Error != nil {
@@ -466,14 +249,6 @@ func runFileRetrievalTest(nodeCount int) error {
/*
The test generates the given number of chunks.
-The upload is done by dependency to the global
-`live` and `history` variables;
-
-If `live` is set, first stream subscriptions are established, then
-upload to a random node.
-
-If `history` is enabled, first upload then build up subscriptions.
-
The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. Nevertheless a health check runs in the
@@ -482,259 +257,129 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
func runRetrievalTest(chunkCount int, nodeCount int) error {
- //for every run (live, history), int the variables
- initRetrievalTest()
- //the ids of the snapshot nodes, initiate only now as we need nodeCount
- ids = make([]discover.NodeID, nodeCount)
- //channel to check for disconnection errors
- disconnectC := make(chan error)
- //channel to close disconnection watcher routine
- quitC := make(chan struct{})
- //the test conf (using same as in `snapshot_sync_test`
- conf = &synctestConfig{}
- //map of overlay address to discover ID
- conf.addrToIdMap = make(map[string]discover.NodeID)
- //array where the generated chunk hashes will be stored
- conf.hashes = make([]storage.Address, 0)
- //load nodes from the snapshot file
- net, err := initNetWithSnapshot(nodeCount)
- if err != nil {
- return err
- }
- var rpcSubscriptionsWg sync.WaitGroup
- //do cleanup after test is terminated
- defer func() {
- //shutdown the snapshot network
- net.Shutdown()
- //after the test, clean up local stores initialized with createLocalStoreForId
- localStoreCleanup()
- //finally clear all data directories
- datadirsCleanup()
- }()
- //get the nodes of the network
- nodes := net.GetNodes()
- //select one index at random...
- idx := rand.Intn(len(nodes))
- //...and get the the node at that index
- //this is the node selected for upload
- uploadNode := nodes[idx]
- //iterate over all nodes...
- for c := 0; c < len(nodes); c++ {
- //create an array of discovery nodeIDS
- ids[c] = nodes[c].ID()
- a := network.ToOverlayAddr(ids[c].Bytes())
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- conf.addrToIdMap[string(a)] = ids[c]
- }
-
- //needed for healthy call
- ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
-
- trigger := make(chan discover.NodeID)
- //simulation action
- action := func(ctx context.Context) error {
- //first run the health check on all nodes,
- //wait until nodes are all healthy
- ticker := time.NewTicker(200 * time.Millisecond)
- defer ticker.Stop()
- for range ticker.C {
- healthy := true
- for _, id := range ids {
- r := registries[id]
- //PeerPot for this node
- addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
- pp := ppmap[addr]
- //call Healthy RPC
- h := r.delivery.overlay.Healthy(pp)
- //print info
- log.Debug(r.delivery.overlay.String())
- log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
- if !h.GotNN || !h.Full {
- healthy = false
- break
- }
- }
- if healthy {
- break
- }
- }
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- if history {
- log.Info("Uploading for history")
- //If testing only history, we upload the chunk(s) first
- conf.hashes, err = uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount)
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
- return err
+ return nil, nil, err
}
- }
-
- //variables needed to wait for all subscriptions established before uploading
- errc := make(chan error)
-
- //now setup and start event watching in order to know when we can upload
- ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
- defer watchCancel()
-
- log.Info("Setting up stream subscription")
- //We need two iterations, one to subscribe to the subscription events
- //(so we know when setup phase is finished), and one to
- //actually run the stream subscriptions. We can't do it in the same iteration,
- //because while the first nodes in the loop are setting up subscriptions,
- //the latter ones have not subscribed to listen to peer events yet,
- //and then we miss events.
-
- //first iteration: setup disconnection watcher and subscribe to peer events
- for j, id := range ids {
- log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
- client, err := net.GetNode(id).Client()
- if err != nil {
- return err
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ store.Close()
}
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
- //check for `SubscribeMsg` events to know when setup phase is complete
- wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
- // doneC is nil, the error happened which is sent to errc channel, already
- if wsDoneC == nil {
- continue
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-wsDoneC
- rpcSubscriptionsWg.Done()
- }()
-
- //watch for peers disconnecting
- wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
- if err != nil {
- return err
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-wdDoneC
- rpcSubscriptionsWg.Done()
- }()
- }
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ DoSync: true,
+ SyncUpdateDelay: 0,
+ })
- //second iteration: start syncing and setup stream subscriptions
- for j, id := range ids {
- log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j))
- client, err := net.GetNode(id).Client()
- if err != nil {
- return err
- }
- //start syncing!
- var cnt int
- err = client.CallContext(ctx, &cnt, "stream_startSyncing")
- if err != nil {
- return err
- }
- //increment the number of subscriptions we need to wait for
- //by the count returned from startSyncing (SYNC subscriptions)
- subscriptionCount += cnt
- //now also add the number of RETRIEVAL_REQUEST subscriptions
- for snid := range registries[id].peers {
- subscriptionCount++
- err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top)
- if err != nil {
- return err
- }
- }
- }
+ fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ bucketKeyFileStore = simulation.BucketKey("filestore")
+ bucket.Store(bucketKeyFileStore, fileStore)
- //now wait until the number of expected subscriptions has been finished
- //`watchSubscriptionEvents` will write with a `nil` value to errc
- //every time a `SubscriptionMsg` has been received
- for err := range errc {
- if err != nil {
- return err
- }
- //`nil` received, decrement count
- subscriptionCount--
- //all subscriptions received
- if subscriptionCount == 0 {
- break
- }
- }
+ return r, cleanup, nil
- log.Info("Stream subscriptions successfully requested, action terminated")
+ },
+ })
+ defer sim.Close()
- if live {
- //now upload the chunks to the selected random single node
- chnks, err := uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount)
- if err != nil {
- return err
- }
- conf.hashes = append(conf.hashes, chnks...)
- }
+ conf := &synctestConfig{}
+ //map of discover ID to indexes of chunks expected at that ID
+ conf.idToChunksMap = make(map[discover.NodeID][]int)
+ //map of overlay address to discover ID
+ conf.addrToIDMap = make(map[string]discover.NodeID)
+ //array where the generated chunk hashes will be stored
+ conf.hashes = make([]storage.Address, 0)
- return nil
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ if err != nil {
+ return err
}
- chunkSize := storage.DefaultChunkSize
-
- //check defines what will be checked during the test
- check := func(ctx context.Context, id discover.NodeID) (bool, error) {
-
- //don't check the uploader node
- if id == uploadNode.ID() {
- return true, nil
+ ctx := context.Background()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ for _, n := range nodeIDs {
+ //get the kademlia overlay address from this ID
+ a := network.ToOverlayAddr(n.Bytes())
+ //append it to the array of all overlay addresses
+ conf.addrs = append(conf.addrs, a)
+ //the proximity calculation is on overlay addr,
+ //the p2p/simulations check func triggers on discover.NodeID,
+ //so we need to know which overlay addr maps to which nodeID
+ conf.addrToIDMap[string(a)] = n
}
- select {
- case <-ctx.Done():
- return false, ctx.Err()
- case e := <-disconnectC:
- log.Error(e.Error())
- return false, fmt.Errorf("Disconnect event detected, network unhealthy")
- default:
+ //an array for the random files
+ var randomFiles []string
+ //this is the node selected for upload
+ node := sim.RandomUpNode()
+ item, ok := sim.NodeItem(node.ID, bucketKeyStore)
+ if !ok {
+ return fmt.Errorf("No localstore")
}
- log.Trace(fmt.Sprintf("Checking node: %s", id))
- //if there are more than one chunk, test only succeeds if all expected chunks are found
- allSuccess := true
-
- //check on the node's FileStore (netstore)
- fileStore := registries[id].fileStore
- //check all chunks
- for _, chnk := range conf.hashes {
- reader, _ := fileStore.Retrieve(context.TODO(), chnk)
- //assuming that reading the Size of the chunk is enough to know we found it
- if s, err := reader.Size(context.TODO(), nil); err != nil || s != chunkSize {
- allSuccess = false
- log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id)
- } else {
- log.Debug(fmt.Sprintf("Chunk %x found", chnk))
- }
+ lstore := item.(*storage.LocalStore)
+ conf.hashes, err = uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
+ if err != nil {
+ return err
+ }
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
- return allSuccess, nil
- }
- //for each tick, run the checks on all nodes
- timingTicker := time.NewTicker(5 * time.Second)
- defer timingTicker.Stop()
- go func() {
- for range timingTicker.C {
- for i := 0; i < len(ids); i++ {
- log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
- trigger <- ids[i]
+ // File retrieval check is repeated until all uploaded files are retrieved from all nodes
+ // or until the timeout is reached.
+ allSuccess := false
+ for !allSuccess {
+ for _, id := range nodeIDs {
+ //for each expected chunk, check if it is in the local store
+ localChunks := conf.idToChunksMap[id]
+ localSuccess := true
+ for _, ch := range localChunks {
+ //get the real chunk by the index in the index array
+ chunk := conf.hashes[ch]
+ log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
+ //check if the expected chunk is indeed in the localstore
+ var err error
+ //check on the node's FileStore (netstore)
+ item, ok := sim.NodeItem(id, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No registry")
+ }
+ fileStore := item.(*storage.FileStore)
+ //check all chunks
+ for i, hash := range conf.hashes {
+ reader, _ := fileStore.Retrieve(context.TODO(), hash)
+ //check that we can read the file size and that it corresponds to the generated file size
+ if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
+ allSuccess = false
+ log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
+ } else {
+ log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
+ }
+ }
+ if err != nil {
+ log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
+ localSuccess = false
+ } else {
+ log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ }
+ }
+ allSuccess = localSuccess
}
}
- }()
-
- log.Info("Starting simulation run...")
-
- timeout := MaxTimeout * time.Second
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
-
- //run the simulation
- result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
- Action: action,
- Trigger: trigger,
- Expect: &simulations.Expectation{
- Nodes: ids,
- Check: check,
- },
+ if !allSuccess {
+ return fmt.Errorf("Not all chunks succeeded!")
+ }
+ return nil
})
if result.Error != nil {
@@ -743,53 +388,3 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
return nil
}
-
-//upload generated files to nodes
-//every node gets one file uploaded
-func uploadFilesToNodes(nodes []*simulations.Node) ([]storage.Address, []string, error) {
- nodeCnt := len(nodes)
- log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
- //array holding generated files
- rfiles := make([]string, nodeCnt)
- //array holding the root hashes of the files
- rootAddrs := make([]storage.Address, nodeCnt)
-
- var err error
- //for every node, generate a file and upload
- for i, n := range nodes {
- id := n.ID()
- fileStore := registries[id].fileStore
- //generate a file
- rfiles[i], err = generateRandomFile()
- if err != nil {
- return nil, nil, err
- }
- //store it (upload it) on the FileStore
- ctx := context.TODO()
- rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
- log.Debug("Uploaded random string file to node")
- if err != nil {
- return nil, nil, err
- }
- err = wait(ctx)
- if err != nil {
- return nil, nil, err
- }
- rootAddrs[i] = rk
- }
- return rootAddrs, rfiles, nil
-}
-
-//generate a random file (string)
-func generateRandomFile() (string, error) {
- //generate a random file size between minFileSize and maxFileSize
- fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
- log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
- b := make([]byte, fileSize*1024)
- _, err := crand.Read(b)
- if err != nil {
- log.Error("Error generating random file.", "err", err)
- return "", err
- }
- return string(b), nil
-}
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 0b5257c60..2dfc5898f 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -18,12 +18,8 @@ package stream
import (
"context"
crand "crypto/rand"
- "encoding/json"
- "flag"
"fmt"
"io"
- "io/ioutil"
- "math/rand"
"os"
"sync"
"testing"
@@ -31,82 +27,27 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
- streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/pot"
+ "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
const testMinProxBinSize = 2
const MaxTimeout = 600
-var (
- pof = pot.DefaultPof(256)
-
- conf *synctestConfig
- ids []discover.NodeID
- datadirs map[discover.NodeID]string
- ppmap map[string]*network.PeerPot
-
- live bool
- history bool
-
- longrunning = flag.Bool("longrunning", false, "do run long-running tests")
-)
-
type synctestConfig struct {
addrs [][]byte
hashes []storage.Address
idToChunksMap map[discover.NodeID][]int
chunksToNodesMap map[string][]int
- addrToIdMap map[string]discover.NodeID
-}
-
-func init() {
- rand.Seed(time.Now().Unix())
-}
-
-//common_test needs to initialize the test in a init() func
-//in order for adapters to register the NewStreamerService;
-//this service is dependent on some global variables
-//we thus need to initialize first as init() as well.
-func initSyncTest() {
- //assign the toAddr func so NewStreamerService can build the addr
- toAddr = func(id discover.NodeID) *network.BzzAddr {
- addr := network.NewAddrFromNodeID(id)
- return addr
- }
- //global func to create local store
- if *useMockStore {
- createStoreFunc = createMockStore
- } else {
- createStoreFunc = createTestLocalStorageForId
- }
- //local stores
- stores = make(map[discover.NodeID]storage.ChunkStore)
- //data directories for each node and store
- datadirs = make(map[discover.NodeID]string)
- //deliveries for each node
- deliveries = make(map[discover.NodeID]*Delivery)
- //registries, map of discover.NodeID to its streamer
- registries = make(map[discover.NodeID]*TestRegistry)
- //not needed for this test but required from common_test for NewStreamService
- waitPeerErrC = make(chan error)
- //also not needed for this test but required for NewStreamService
- peerCount = func(id discover.NodeID) int {
- if ids[0] == id || ids[len(ids)-1] == id {
- return 1
- }
- return 2
- }
- if *useMockStore {
- createGlobalStore()
- }
+ addrToIDMap map[string]discover.NodeID
}
//This test is a syncing test for nodes.
@@ -116,12 +57,12 @@ func initSyncTest() {
//to the pivot node, and we check that nodes get the chunks
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
-func TestSyncing(t *testing.T) {
+func TestSyncingViaGlobalSync(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
- testSyncing(t, *chunks, *nodes)
+ testSyncingViaGlobalSync(t, *chunks, *nodes)
} else {
var nodeCnt []int
var chnkCnt []int
@@ -138,230 +79,279 @@ func TestSyncing(t *testing.T) {
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
- testSyncing(t, chnk, n)
+ testSyncingViaGlobalSync(t, chnk, n)
}
}
}
}
-//Do run the tests
-//Every test runs 3 times, a live, a history, and a live AND history
-func testSyncing(t *testing.T, chunkCount int, nodeCount int) {
- //test live and NO history
- log.Info("Testing live and no history")
- live = true
- history = false
- err := runSyncTest(chunkCount, nodeCount, live, history)
- if err != nil {
- t.Fatal(err)
- }
- //test history only
- log.Info("Testing history only")
- live = false
- history = true
- err = runSyncTest(chunkCount, nodeCount, live, history)
- if err != nil {
- t.Fatal(err)
- }
- //finally test live and history
- log.Info("Testing live and history")
- live = true
- err = runSyncTest(chunkCount, nodeCount, live, history)
- if err != nil {
- t.Fatal(err)
+func TestSyncingViaDirectSubscribe(t *testing.T) {
+ //if nodes/chunks have been provided via commandline,
+ //run the tests with these values
+ if *nodes != 0 && *chunks != 0 {
+ log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
+ err := testSyncingViaDirectSubscribe(*chunks, *nodes)
+ if err != nil {
+ t.Fatal(err)
+ }
+ } else {
+ var nodeCnt []int
+ var chnkCnt []int
+ //if the `longrunning` flag has been provided
+ //run more test combinations
+ if *longrunning {
+ chnkCnt = []int{1, 8, 32, 256, 1024}
+ nodeCnt = []int{32, 16}
+ } else {
+ //default test
+ chnkCnt = []int{4, 32}
+ nodeCnt = []int{32, 16}
+ }
+ for _, chnk := range chnkCnt {
+ for _, n := range nodeCnt {
+ log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
+ err := testSyncingViaDirectSubscribe(chnk, n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ }
}
}
-/*
-The test generates the given number of chunks
+func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-The upload is done by dependency to the global
-`live` and `history` variables;
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ store.Close()
+ }
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
-If `live` is set, first stream subscriptions are established, then
-upload to a random node.
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ DoSync: true,
+ SyncUpdateDelay: 3 * time.Second,
+ })
+ bucket.Store(bucketKeyRegistry, r)
-If `history` is enabled, first upload then build up subscriptions.
+ return r, cleanup, nil
-For every chunk generated, the nearest node addresses
-are identified, we verify that the nodes closer to the
-chunk addresses actually do have the chunks in their local stores.
+ },
+ })
+ defer sim.Close()
-The test loads a snapshot file to construct the swarm network,
-assuming that the snapshot file identifies a healthy
-kademlia network. The snapshot should have 'streamer' in its service list.
+ log.Info("Initializing test config")
-For every test run, a series of three tests will be executed:
-- a LIVE test first, where first subscriptions are established,
- then a file (random chunks) is uploaded
-- a HISTORY test, where the file is uploaded first, and then
- the subscriptions are established
-- a crude LIVE AND HISTORY test last, where (different) chunks
- are uploaded twice, once before and once after subscriptions
-*/
-func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
- initSyncTest()
- //the ids of the snapshot nodes, initiate only now as we need nodeCount
- ids = make([]discover.NodeID, nodeCount)
- //initialize the test struct
- conf = &synctestConfig{}
+ conf := &synctestConfig{}
//map of discover ID to indexes of chunks expected at that ID
conf.idToChunksMap = make(map[discover.NodeID][]int)
//map of overlay address to discover ID
- conf.addrToIdMap = make(map[string]discover.NodeID)
+ conf.addrToIDMap = make(map[string]discover.NodeID)
//array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0)
- //channel to trigger node checks in the simulation
- trigger := make(chan discover.NodeID)
- //channel to check for disconnection errors
- disconnectC := make(chan error)
- //channel to close disconnection watcher routine
- quitC := make(chan struct{})
-
- //load nodes from the snapshot file
- net, err := initNetWithSnapshot(nodeCount)
+
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil {
- return err
+ t.Fatal(err)
}
- var rpcSubscriptionsWg sync.WaitGroup
- //do cleanup after test is terminated
- defer func() {
- // close quitC channel to signall all goroutines to clanup
- // before calling simulation network shutdown.
- close(quitC)
- //wait for all rpc subscriptions to unsubscribe
- rpcSubscriptionsWg.Wait()
- //shutdown the snapshot network
- net.Shutdown()
- //after the test, clean up local stores initialized with createLocalStoreForId
- localStoreCleanup()
- //finally clear all data directories
- datadirsCleanup()
- }()
- //get the nodes of the network
- nodes := net.GetNodes()
- //select one index at random...
- idx := rand.Intn(len(nodes))
- //...and get the the node at that index
- //this is the node selected for upload
- node := nodes[idx]
- log.Info("Initializing test config")
- //iterate over all nodes...
- for c := 0; c < len(nodes); c++ {
- //create an array of discovery node IDs
- ids[c] = nodes[c].ID()
- //get the kademlia overlay address from this ID
- a := network.ToOverlayAddr(ids[c].Bytes())
- //append it to the array of all overlay addresses
- conf.addrs = append(conf.addrs, a)
- //the proximity calculation is on overlay addr,
- //the p2p/simulations check func triggers on discover.NodeID,
- //so we need to know which overlay addr maps to which nodeID
- conf.addrToIdMap[string(a)] = ids[c]
- }
- log.Info("Test config successfully initialized")
-
- //only needed for healthy call when debugging
- ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs)
-
- //define the action to be performed before the test checks: start syncing
- action := func(ctx context.Context) error {
- //first run the health check on all nodes,
- //wait until nodes are all healthy
- ticker := time.NewTicker(200 * time.Millisecond)
- defer ticker.Stop()
- for range ticker.C {
- healthy := true
- for _, id := range ids {
- r := registries[id]
- //PeerPot for this node
- addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes()))
- pp := ppmap[addr]
- //call Healthy RPC
- h := r.delivery.overlay.Healthy(pp)
- //print info
- log.Debug(r.delivery.overlay.String())
- log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
- if !h.GotNN || !h.Full {
- healthy = false
- break
- }
- }
- if healthy {
- break
- }
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancelSimRun()
+
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ for _, n := range nodeIDs {
+ //get the kademlia overlay address from this ID
+ a := network.ToOverlayAddr(n.Bytes())
+ //append it to the array of all overlay addresses
+ conf.addrs = append(conf.addrs, a)
+ //the proximity calculation is on overlay addr,
+ //the p2p/simulations check func triggers on discover.NodeID,
+ //so we need to know which overlay addr maps to which nodeID
+ conf.addrToIDMap[string(a)] = n
+ }
+
+ //get the the node at that index
+ //this is the node selected for upload
+ node := sim.RandomUpNode()
+ item, ok := sim.NodeItem(node.ID, bucketKeyStore)
+ if !ok {
+ return fmt.Errorf("No localstore")
+ }
+ lstore := item.(*storage.LocalStore)
+ hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
+ if err != nil {
+ return err
+ }
+ conf.hashes = append(conf.hashes, hashes...)
+ mapKeysToNodes(conf)
+
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
- if history {
- log.Info("Uploading for history")
- //If testing only history, we upload the chunk(s) first
- chunks, err := uploadFileToSingleNodeStore(node.ID(), chunkCount)
+ // File retrieval check is repeated until all uploaded files are retrieved from all nodes
+ // or until the timeout is reached.
+ allSuccess := false
+ var gDir string
+ var globalStore *mockdb.GlobalStore
+ if *useMockStore {
+ gDir, globalStore, err = createGlobalStore()
if err != nil {
- return err
+ return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
}
- conf.hashes = append(conf.hashes, chunks...)
- //finally map chunks to the closest addresses
- mapKeysToNodes(conf)
+ defer func() {
+ os.RemoveAll(gDir)
+ err := globalStore.Close()
+ if err != nil {
+ log.Error("Error closing global store! %v", "err", err)
+ }
+ }()
}
+ for !allSuccess {
+ for _, id := range nodeIDs {
+ //for each expected chunk, check if it is in the local store
+ localChunks := conf.idToChunksMap[id]
+ localSuccess := true
+ for _, ch := range localChunks {
+ //get the real chunk by the index in the index array
+ chunk := conf.hashes[ch]
+ log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
+ //check if the expected chunk is indeed in the localstore
+ var err error
+ if *useMockStore {
+ //use the globalStore if the mockStore should be used; in that case,
+ //the complete localStore stack is bypassed for getting the chunk
+ _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
+ } else {
+ //use the actual localstore
+ item, ok := sim.NodeItem(id, bucketKeyStore)
+ if !ok {
+ return fmt.Errorf("Error accessing localstore")
+ }
+ lstore := item.(*storage.LocalStore)
+ _, err = lstore.Get(ctx, chunk)
+ }
+ if err != nil {
+ log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
+ localSuccess = false
+ } else {
+ log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ }
+ }
+ allSuccess = localSuccess
+ }
+ }
+ if !allSuccess {
+ return fmt.Errorf("Not all chunks succeeded!")
+ }
+ return nil
+ })
- //variables needed to wait for all subscriptions established before uploading
- errc := make(chan error)
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
- //now setup and start event watching in order to know when we can upload
- ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
- defer watchCancel()
+/*
+The test generates the given number of chunks
- log.Info("Setting up stream subscription")
+For every chunk generated, the nearest node addresses
+are identified, we verify that the nodes closer to the
+chunk addresses actually do have the chunks in their local stores.
- //We need two iterations, one to subscribe to the subscription events
- //(so we know when setup phase is finished), and one to
- //actually run the stream subscriptions. We can't do it in the same iteration,
- //because while the first nodes in the loop are setting up subscriptions,
- //the latter ones have not subscribed to listen to peer events yet,
- //and then we miss events.
+The test loads a snapshot file to construct the swarm network,
+assuming that the snapshot file identifies a healthy
+kademlia network. The snapshot should have 'streamer' in its service list.
+*/
+func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- //first iteration: setup disconnection watcher and subscribe to peer events
- for j, id := range ids {
- log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j))
- client, err := net.GetNode(id).Client()
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ store, datadir, err := createTestLocalStorageForID(id, addr)
if err != nil {
- return err
+ return nil, nil, err
}
-
- wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC)
- // doneC is nil, the error happened which is sent to errc channel, already
- if wsDoneC == nil {
- continue
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ store.Close()
}
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-wsDoneC
- rpcSubscriptionsWg.Done()
- }()
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
- //watch for peers disconnecting
- wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC)
- if err != nil {
- return err
- }
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-wdDoneC
- rpcSubscriptionsWg.Done()
- }()
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ bucket.Store(bucketKeyRegistry, r)
+
+ fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ return r, cleanup, nil
+
+ },
+ })
+ defer sim.Close()
+
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancelSimRun()
+
+ conf := &synctestConfig{}
+ //map of discover ID to indexes of chunks expected at that ID
+ conf.idToChunksMap = make(map[discover.NodeID][]int)
+ //map of overlay address to discover ID
+ conf.addrToIDMap = make(map[string]discover.NodeID)
+ //array where the generated chunk hashes will be stored
+ conf.hashes = make([]storage.Address, 0)
+
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ if err != nil {
+ return err
+ }
+
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
+ for _, n := range nodeIDs {
+ //get the kademlia overlay address from this ID
+ a := network.ToOverlayAddr(n.Bytes())
+ //append it to the array of all overlay addresses
+ conf.addrs = append(conf.addrs, a)
+ //the proximity calculation is on overlay addr,
+ //the p2p/simulations check func triggers on discover.NodeID,
+ //so we need to know which overlay addr maps to which nodeID
+ conf.addrToIDMap[string(a)] = n
}
- //second iteration: start syncing
- for j, id := range ids {
+ var subscriptionCount int
+
+ filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4)
+ eventC := sim.PeerEvents(ctx, nodeIDs, filter)
+
+ for j, node := range nodeIDs {
log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
- client, err := net.GetNode(id).Client()
- if err != nil {
- return err
- }
//start syncing!
+ item, ok := sim.NodeItem(node, bucketKeyRegistry)
+ if !ok {
+ return fmt.Errorf("No registry")
+ }
+ registry := item.(*Registry)
+
var cnt int
- err = client.CallContext(ctx, &cnt, "stream_startSyncing")
+ cnt, err = startSyncing(registry, conf)
if err != nil {
return err
}
@@ -370,117 +360,89 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
subscriptionCount += cnt
}
- //now wait until the number of expected subscriptions has been finished
- //`watchSubscriptionEvents` will write with a `nil` value to errc
- for err := range errc {
- if err != nil {
- return err
+ for e := range eventC {
+ if e.Error != nil {
+ return e.Error
}
- //`nil` received, decrement count
subscriptionCount--
- //all subscriptions received
if subscriptionCount == 0 {
break
}
}
-
- log.Info("Stream subscriptions successfully requested")
- if live {
- //now upload the chunks to the selected random single node
- hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount)
- if err != nil {
- return err
- }
- conf.hashes = append(conf.hashes, hashes...)
- //finally map chunks to the closest addresses
- log.Debug(fmt.Sprintf("Uploaded chunks for live syncing: %v", conf.hashes))
- mapKeysToNodes(conf)
- log.Info(fmt.Sprintf("Uploaded %d chunks to random single node", chunkCount))
+ //select a random node for upload
+ node := sim.RandomUpNode()
+ item, ok := sim.NodeItem(node.ID, bucketKeyStore)
+ if !ok {
+ return fmt.Errorf("No localstore")
}
+ lstore := item.(*storage.LocalStore)
+ hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
+ if err != nil {
+ return err
+ }
+ conf.hashes = append(conf.hashes, hashes...)
+ mapKeysToNodes(conf)
- log.Info("Action terminated")
-
- return nil
- }
-
- //check defines what will be checked during the test
- check := func(ctx context.Context, id discover.NodeID) (bool, error) {
- select {
- case <-ctx.Done():
- return false, ctx.Err()
- case e := <-disconnectC:
- log.Error(e.Error())
- return false, fmt.Errorf("Disconnect event detected, network unhealthy")
- default:
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
- log.Trace(fmt.Sprintf("Checking node: %s", id))
- //select the local store for the given node
- //if there are more than one chunk, test only succeeds if all expected chunks are found
- allSuccess := true
-
- //all the chunk indexes which are supposed to be found for this node
- localChunks := conf.idToChunksMap[id]
- //for each expected chunk, check if it is in the local store
- for _, ch := range localChunks {
- //get the real chunk by the index in the index array
- chunk := conf.hashes[ch]
- log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
- //check if the expected chunk is indeed in the localstore
- var err error
- if *useMockStore {
- if globalStore == nil {
- return false, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
- }
- //use the globalStore if the mockStore should be used; in that case,
- //the complete localStore stack is bypassed for getting the chunk
- _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
- } else {
- //use the actual localstore
- lstore := stores[id]
- _, err = lstore.Get(context.TODO(), chunk)
- }
+
+ var gDir string
+ var globalStore *mockdb.GlobalStore
+ if *useMockStore {
+ gDir, globalStore, err = createGlobalStore()
if err != nil {
- log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
- allSuccess = false
- } else {
- log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
}
+ defer os.RemoveAll(gDir)
}
-
- return allSuccess, nil
- }
-
- //for each tick, run the checks on all nodes
- timingTicker := time.NewTicker(time.Second * 1)
- defer timingTicker.Stop()
- go func() {
- for range timingTicker.C {
- for i := 0; i < len(ids); i++ {
- log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i]))
- trigger <- ids[i]
+ // File retrieval check is repeated until all uploaded files are retrieved from all nodes
+ // or until the timeout is reached.
+ allSuccess := false
+ for !allSuccess {
+ for _, id := range nodeIDs {
+ //for each expected chunk, check if it is in the local store
+ localChunks := conf.idToChunksMap[id]
+ localSuccess := true
+ for _, ch := range localChunks {
+ //get the real chunk by the index in the index array
+ chunk := conf.hashes[ch]
+ log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
+ //check if the expected chunk is indeed in the localstore
+ var err error
+ if *useMockStore {
+ //use the globalStore if the mockStore should be used; in that case,
+ //the complete localStore stack is bypassed for getting the chunk
+ _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
+ } else {
+ //use the actual localstore
+ item, ok := sim.NodeItem(id, bucketKeyStore)
+ if !ok {
+ return fmt.Errorf("Error accessing localstore")
+ }
+ lstore := item.(*storage.LocalStore)
+ _, err = lstore.Get(ctx, chunk)
+ }
+ if err != nil {
+ log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
+ localSuccess = false
+ } else {
+ log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
+ }
+ }
+ allSuccess = localSuccess
}
}
- }()
-
- log.Info("Starting simulation run...")
-
- timeout := MaxTimeout * time.Second
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
-
- //run the simulation
- result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{
- Action: action,
- Trigger: trigger,
- Expect: &simulations.Expectation{
- Nodes: ids,
- Check: check,
- },
+ if !allSuccess {
+ return fmt.Errorf("Not all chunks succeeded!")
+ }
+ return nil
})
if result.Error != nil {
return result.Error
}
+
log.Info("Simulation terminated")
return nil
}
@@ -489,20 +451,9 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
//the kademlia's `EachBin` function.
//returns the number of subscriptions requested
-func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) {
+func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
var err error
- if log.Lvl(*loglevel) == log.LvlDebug {
- //PeerPot for this node
- addr := common.Bytes2Hex(r.addr.OAddr)
- pp := ppmap[addr]
- //call Healthy RPC
- h := r.delivery.overlay.Healthy(pp)
- //print info
- log.Debug(r.delivery.overlay.String())
- log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full))
- }
-
kad, ok := r.delivery.overlay.(*network.Kademlia)
if !ok {
return 0, fmt.Errorf("Not a Kademlia!")
@@ -512,14 +463,10 @@ func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) {
//iterate over each bin and solicit needed subscription to bins
kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool {
//identify begin and start index of the bin(s) we want to subscribe to
- log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), conf.addrToIdMap[string(conn.Address())], po))
- var histRange *Range
- if history {
- histRange = &Range{}
- }
+ histRange := &Range{}
subCnt++
- err = r.RequestSubscription(conf.addrToIdMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), live), histRange, Top)
+ err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top)
if err != nil {
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
return false
@@ -552,7 +499,7 @@ func mapKeysToNodes(conf *synctestConfig) {
return false
}
if pl == 256 || pl == po {
- log.Trace(fmt.Sprintf("appending %s", conf.addrToIdMap[string(a)]))
+ log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)]))
nns = append(nns, indexmap[string(a)])
nodemap[string(a)] = append(nodemap[string(a)], i)
}
@@ -567,26 +514,24 @@ func mapKeysToNodes(conf *synctestConfig) {
}
for addr, chunks := range nodemap {
//this selects which chunks are expected to be found with the given node
- conf.idToChunksMap[conf.addrToIdMap[addr]] = chunks
+ conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
}
log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
conf.chunksToNodesMap = kmap
}
//upload a file(chunks) to a single local node store
-func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.Address, error) {
+func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) {
log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
- lstore := stores[id]
- size := chunkSize
fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
+ size := chunkSize
var rootAddrs []storage.Address
for i := 0; i < chunkCount; i++ {
- ctx := context.TODO()
- rk, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
+ rk, wait, err := fileStore.Store(context.TODO(), io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
return nil, err
}
- err = wait(ctx)
+ err = wait(context.TODO())
if err != nil {
return nil, err
}
@@ -595,129 +540,3 @@ func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.
return rootAddrs, nil
}
-
-//initialize a network from a snapshot
-func initNetWithSnapshot(nodeCount int) (*simulations.Network, error) {
-
- var a adapters.NodeAdapter
- //add the streamer service to the node adapter
-
- if *adapter == "exec" {
- dirname, err := ioutil.TempDir(".", "")
- if err != nil {
- return nil, err
- }
- a = adapters.NewExecAdapter(dirname)
- } else if *adapter == "tcp" {
- a = adapters.NewTCPAdapter(services)
- } else if *adapter == "sim" {
- a = adapters.NewSimAdapter(services)
- }
-
- log.Info("Setting up Snapshot network")
-
- net := simulations.NewNetwork(a, &simulations.NetworkConfig{
- ID: "0",
- DefaultService: "streamer",
- })
-
- f, err := os.Open(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
- if err != nil {
- return nil, err
- }
- defer f.Close()
- jsonbyte, err := ioutil.ReadAll(f)
- if err != nil {
- return nil, err
- }
- var snap simulations.Snapshot
- err = json.Unmarshal(jsonbyte, &snap)
- if err != nil {
- return nil, err
- }
-
- //the snapshot probably has the property EnableMsgEvents not set
- //just in case, set it to true!
- //(we need this to wait for messages before uploading)
- for _, n := range snap.Nodes {
- n.Node.Config.EnableMsgEvents = true
- }
-
- log.Info("Waiting for p2p connections to be established...")
-
- //now we can load the snapshot
- err = net.Load(&snap)
- if err != nil {
- return nil, err
- }
- log.Info("Snapshot loaded")
- return net, nil
-}
-
-//we want to wait for subscriptions to be established before uploading to test
-//that live syncing is working correctly
-func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}) {
- events := make(chan *p2p.PeerEvent)
- sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
- if err != nil {
- log.Error(err.Error())
- errc <- fmt.Errorf("error getting peer events for node %v: %s", id, err)
- return
- }
- c := make(chan struct{})
-
- go func() {
- defer func() {
- log.Trace("watch subscription events: unsubscribe", "id", id)
- sub.Unsubscribe()
- close(c)
- }()
-
- for {
- select {
- case <-quitC:
- return
- case <-ctx.Done():
- select {
- case errc <- ctx.Err():
- case <-quitC:
- }
- return
- case e := <-events:
- //just catch SubscribeMsg
- if e.Type == p2p.PeerEventTypeMsgRecv && e.Protocol == "stream" && e.MsgCode != nil && *e.MsgCode == 4 {
- errc <- nil
- }
- case err := <-sub.Err():
- if err != nil {
- select {
- case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err):
- case <-quitC:
- }
- return
- }
- }
- }
- }()
- return c
-}
-
-//create a local store for the given node
-func createTestLocalStorageForId(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
- var datadir string
- var err error
- datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
- if err != nil {
- return nil, err
- }
- datadirs[id] = datadir
- var store storage.ChunkStore
- params := storage.NewDefaultLocalStoreParams()
- params.ChunkDbPath = datadir
- params.BaseKey = addr.Over()
- store, err = storage.NewTestLocalStoreForAddr(params)
- if err != nil {
- return nil, err
- }
- return store, nil
-}
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index a3d53e648..f72aa3444 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -23,18 +23,22 @@ import (
"io"
"io/ioutil"
"math"
+ "os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
- "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
- streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
const dataChunkCount = 200
@@ -46,222 +50,193 @@ func TestSyncerSimulation(t *testing.T) {
testSyncBetweenNodes(t, 16, 1, dataChunkCount, true, 1)
}
-func createMockStore(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) {
- var err error
+func createMockStore(globalStore *mockdb.GlobalStore, id discover.NodeID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
address := common.BytesToAddress(id.Bytes())
mockStore := globalStore.NewNodeStore(address)
params := storage.NewDefaultLocalStoreParams()
- datadirs[id], err = ioutil.TempDir("", "localMockStore-"+id.TerminalString())
+
+ datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString())
if err != nil {
- return nil, err
+ return nil, "", err
}
- params.Init(datadirs[id])
+ params.Init(datadir)
params.BaseKey = addr.Over()
- lstore, err := storage.NewLocalStore(params, mockStore)
- return lstore, nil
+ lstore, err = storage.NewLocalStore(params, mockStore)
+ return lstore, datadir, nil
}
func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool, po uint8) {
- defer setDefaultSkipCheck(defaultSkipCheck)
- defaultSkipCheck = skipCheck
- //data directories for each node and store
- datadirs = make(map[discover.NodeID]string)
- if *useMockStore {
- createStoreFunc = createMockStore
- createGlobalStore()
- } else {
- createStoreFunc = createTestLocalStorageFromSim
- }
- defer datadirsCleanup()
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ var store storage.ChunkStore
+ var globalStore *mockdb.GlobalStore
+ var gDir, datadir string
+
+ id := ctx.Config.ID
+ addr := network.NewAddrFromNodeID(id)
+ //hack to put addresses in same space
+ addr.OAddr[0] = byte(0)
+
+ if *useMockStore {
+ gDir, globalStore, err = createGlobalStore()
+ if err != nil {
+ return nil, nil, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
+ }
+ store, datadir, err = createMockStore(globalStore, id, addr)
+ } else {
+ store, datadir, err = createTestLocalStorageForID(id, addr)
+ }
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyStore, store)
+ cleanup = func() {
+ store.Close()
+ os.RemoveAll(datadir)
+ if *useMockStore {
+ err := globalStore.Close()
+ if err != nil {
+ log.Error("Error closing global store! %v", "err", err)
+ }
+ os.RemoveAll(gDir)
+ }
+ }
+ localStore := store.(*storage.LocalStore)
+ db := storage.NewDBAPI(localStore)
+ bucket.Store(bucketKeyDB, db)
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, db)
+ bucket.Store(bucketKeyDelivery, delivery)
+
+ r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ })
+
+ fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ return r, cleanup, nil
+
+ },
+ })
+ defer sim.Close()
- registries = make(map[discover.NodeID]*TestRegistry)
- toAddr = func(id discover.NodeID) *network.BzzAddr {
- addr := network.NewAddrFromNodeID(id)
- //hack to put addresses in same space
- addr.OAddr[0] = byte(0)
- return addr
- }
- conf := &streamTesting.RunConfig{
- Adapter: *adapter,
- NodeCount: nodes,
- ConnLevel: conns,
- ToAddr: toAddr,
- Services: services,
- EnableMsgEvents: false,
- }
- // HACK: these are global variables in the test so that they are available for
- // the service constructor function
- // TODO: will this work with exec/docker adapter?
- // localstore of nodes made available for action and check calls
- stores = make(map[discover.NodeID]storage.ChunkStore)
- deliveries = make(map[discover.NodeID]*Delivery)
// create context for simulation run
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// defer cancel should come before defer simulation teardown
defer cancel()
- // create simulation network with the config
- sim, teardown, err := streamTesting.NewSimulation(conf)
- var rpcSubscriptionsWg sync.WaitGroup
- defer func() {
- rpcSubscriptionsWg.Wait()
- teardown()
- }()
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
- t.Fatal(err.Error())
+ t.Fatal(err)
}
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ nodeIDs := sim.UpNodeIDs()
- nodeIndex := make(map[discover.NodeID]int)
- for i, id := range sim.IDs {
- nodeIndex[id] = i
- if !*useMockStore {
- stores[id] = sim.Stores[i]
- sim.Stores[i] = stores[id]
+ nodeIndex := make(map[discover.NodeID]int)
+ for i, id := range nodeIDs {
+ nodeIndex[id] = i
}
- }
- // peerCount function gives the number of peer connections for a nodeID
- // this is needed for the service run function to wait until
- // each protocol instance runs and the streamer peers are available
- peerCount = func(id discover.NodeID) int {
- if sim.IDs[0] == id || sim.IDs[nodes-1] == id {
- return 1
- }
- return 2
- }
- waitPeerErrC = make(chan error)
- // create DBAPI-s for all nodes
- dbs := make([]*storage.DBAPI, nodes)
- for i := 0; i < nodes; i++ {
- dbs[i] = storage.NewDBAPI(sim.Stores[i].(*storage.LocalStore))
- }
-
- // collect hashes in po 1 bin for each node
- hashes := make([][]storage.Address, nodes)
- totalHashes := 0
- hashCounts := make([]int, nodes)
- for i := nodes - 1; i >= 0; i-- {
- if i < nodes-1 {
- hashCounts[i] = hashCounts[i+1]
- }
- dbs[i].Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
- hashes[i] = append(hashes[i], addr)
- totalHashes++
- hashCounts[i]++
- return true
- })
- }
-
- // errc is error channel for simulation
- errc := make(chan error, 1)
- quitC := make(chan struct{})
- defer close(quitC)
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ if d.Error != nil {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal(d.Error)
+ }
+ }
+ }()
- // action is subscribe
- action := func(ctx context.Context) error {
- // need to wait till an aynchronous process registers the peers in streamer.peers
- // that is used by Subscribe
- // the global peerCount function tells how many connections each node has
- // TODO: this is to be reimplemented with peerEvent watcher without global var
- i := 0
- for err := range waitPeerErrC {
+ // each node Subscribes to each other's swarmChunkServerStreamName
+ for j := 0; j < nodes-1; j++ {
+ id := nodeIDs[j]
+ client, err := sim.Net.GetNode(id).Client()
if err != nil {
- return fmt.Errorf("error waiting for peers: %s", err)
+ t.Fatal(err)
}
- i++
- if i == nodes {
- break
+ sid := nodeIDs[j+1]
+ client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
+ if err != nil {
+ return err
}
- }
- // each node Subscribes to each other's swarmChunkServerStreamName
- for j := 0; j < nodes-1; j++ {
- id := sim.IDs[j]
- sim.Stores[j] = stores[id]
- err := sim.CallClient(id, func(client *rpc.Client) error {
- // report disconnect events to the error channel cos peers should not disconnect
- doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC)
+ if j > 0 || nodes == 2 {
+ item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
+ }
+ fileStore := item.(*storage.FileStore)
+ size := chunkCount * chunkSize
+ _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
- return err
+ t.Fatal(err.Error())
}
- rpcSubscriptionsWg.Add(1)
- go func() {
- <-doneC
- rpcSubscriptionsWg.Done()
- }()
- ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
- defer cancel()
- // start syncing, i.e., subscribe to upstream peers po 1 bin
- sid := sim.IDs[j+1]
- return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
- })
- if err != nil {
- return err
+ wait(ctx)
}
}
// here we distribute chunks of a random file into stores 1...nodes
- rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
- size := chunkCount * chunkSize
- _, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
- if err != nil {
- t.Fatal(err.Error())
- }
- // need to wait cos we then immediately collect the relevant bin content
- wait(ctx)
- if err != nil {
- t.Fatal(err.Error())
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
}
- return nil
- }
-
- // this makes sure check is not called before the previous call finishes
- check := func(ctx context.Context, id discover.NodeID) (bool, error) {
- select {
- case err := <-errc:
- return false, err
- case <-ctx.Done():
- return false, ctx.Err()
- default:
+ // collect hashes in po 1 bin for each node
+ hashes := make([][]storage.Address, nodes)
+ totalHashes := 0
+ hashCounts := make([]int, nodes)
+ for i := nodes - 1; i >= 0; i-- {
+ if i < nodes-1 {
+ hashCounts[i] = hashCounts[i+1]
+ }
+ item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB)
+ if !ok {
+ return fmt.Errorf("No DB")
+ }
+ db := item.(*storage.DBAPI)
+ db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
+ hashes[i] = append(hashes[i], addr)
+ totalHashes++
+ hashCounts[i]++
+ return true
+ })
}
-
- i := nodeIndex[id]
var total, found int
-
- for j := i; j < nodes; j++ {
- total += len(hashes[j])
- for _, key := range hashes[j] {
- chunk, err := dbs[i].Get(ctx, key)
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
- continue
+ for _, node := range nodeIDs {
+ i := nodeIndex[node]
+
+ for j := i; j < nodes; j++ {
+ total += len(hashes[j])
+ for _, key := range hashes[j] {
+ item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB)
+ if !ok {
+ return fmt.Errorf("No DB")
+ }
+ db := item.(*storage.DBAPI)
+ chunk, err := db.Get(ctx, key)
+ if err == storage.ErrFetching {
+ <-chunk.ReqC
+ } else if err != nil {
+ continue
+ }
+ // needed for leveldb not to be closed?
+ // chunk.WaitToStore()
+ found++
}
- // needed for leveldb not to be closed?
- // chunk.WaitToStore()
- found++
}
+ log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)
}
- log.Debug("sync check", "node", id, "index", i, "bin", po, "found", found, "total", total)
- return total == found, nil
- }
+ if total == found && total > 0 {
+ return nil
+ }
+ return fmt.Errorf("Total not equallying found: total is %d", total)
+ })
- conf.Step = &simulations.Step{
- Action: action,
- Trigger: streamTesting.Trigger(500*time.Millisecond, quitC, sim.IDs[0:nodes-1]...),
- Expect: &simulations.Expectation{
- Nodes: sim.IDs[0:1],
- Check: check,
- },
- }
- startedAt := time.Now()
- result, err := sim.Run(ctx, conf)
- finishedAt := time.Now()
- if err != nil {
- t.Fatalf("Setting up simulation failed: %v", err)
- }
if result.Error != nil {
- t.Fatalf("Simulation failed: %s", result.Error)
+ t.Fatal(result.Error)
}
- streamTesting.CheckResult(t, result, startedAt, finishedAt)
}
diff --git a/swarm/network/stream/testing/testing.go b/swarm/network/stream/testing/testing.go
deleted file mode 100644
index d584ec397..000000000
--- a/swarm/network/stream/testing/testing.go
+++ /dev/null
@@ -1,293 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package testing
-
-import (
- "context"
- "errors"
- "fmt"
- "io/ioutil"
- "math/rand"
- "os"
- "sync"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
- "github.com/ethereum/go-ethereum/swarm/storage"
-)
-
-type Simulation struct {
- Net *simulations.Network
- Stores []storage.ChunkStore
- Addrs []network.Addr
- IDs []discover.NodeID
-}
-
-func SetStores(addrs ...network.Addr) ([]storage.ChunkStore, func(), error) {
- var datadirs []string
- stores := make([]storage.ChunkStore, len(addrs))
- var err error
- for i, addr := range addrs {
- var datadir string
- datadir, err = ioutil.TempDir("", "streamer")
- if err != nil {
- break
- }
- var store storage.ChunkStore
- params := storage.NewDefaultLocalStoreParams()
- params.Init(datadir)
- params.BaseKey = addr.Over()
- store, err = storage.NewTestLocalStoreForAddr(params)
- if err != nil {
- break
- }
- datadirs = append(datadirs, datadir)
- stores[i] = store
- }
- teardown := func() {
- for i, datadir := range datadirs {
- stores[i].Close()
- os.RemoveAll(datadir)
- }
- }
- return stores, teardown, err
-}
-
-func NewAdapter(adapterType string, services adapters.Services) (adapter adapters.NodeAdapter, teardown func(), err error) {
- teardown = func() {}
- switch adapterType {
- case "sim":
- adapter = adapters.NewSimAdapter(services)
- case "exec":
- baseDir, err0 := ioutil.TempDir("", "swarm-test")
- if err0 != nil {
- return nil, teardown, err0
- }
- teardown = func() { os.RemoveAll(baseDir) }
- adapter = adapters.NewExecAdapter(baseDir)
- case "docker":
- adapter, err = adapters.NewDockerAdapter()
- if err != nil {
- return nil, teardown, err
- }
- default:
- return nil, teardown, errors.New("adapter needs to be one of sim, exec, docker")
- }
- return adapter, teardown, nil
-}
-
-func CheckResult(t *testing.T, result *simulations.StepResult, startedAt, finishedAt time.Time) {
- t.Logf("Simulation passed in %s", result.FinishedAt.Sub(result.StartedAt))
- if len(result.Passes) > 1 {
- var min, max time.Duration
- var sum int
- for _, pass := range result.Passes {
- duration := pass.Sub(result.StartedAt)
- if sum == 0 || duration < min {
- min = duration
- }
- if duration > max {
- max = duration
- }
- sum += int(duration.Nanoseconds())
- }
- t.Logf("Min: %s, Max: %s, Average: %s", min, max, time.Duration(sum/len(result.Passes))*time.Nanosecond)
- }
- t.Logf("Setup: %s, Shutdown: %s", result.StartedAt.Sub(startedAt), finishedAt.Sub(result.FinishedAt))
-}
-
-type RunConfig struct {
- Adapter string
- Step *simulations.Step
- NodeCount int
- ConnLevel int
- ToAddr func(discover.NodeID) *network.BzzAddr
- Services adapters.Services
- DefaultService string
- EnableMsgEvents bool
-}
-
-func NewSimulation(conf *RunConfig) (*Simulation, func(), error) {
- // create network
- nodes := conf.NodeCount
- adapter, adapterTeardown, err := NewAdapter(conf.Adapter, conf.Services)
- if err != nil {
- return nil, adapterTeardown, err
- }
- defaultService := "streamer"
- if conf.DefaultService != "" {
- defaultService = conf.DefaultService
- }
- net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
- ID: "0",
- DefaultService: defaultService,
- })
- teardown := func() {
- adapterTeardown()
- net.Shutdown()
- }
- ids := make([]discover.NodeID, nodes)
- addrs := make([]network.Addr, nodes)
- // start nodes
- for i := 0; i < nodes; i++ {
- nodeconf := adapters.RandomNodeConfig()
- nodeconf.EnableMsgEvents = conf.EnableMsgEvents
- node, err := net.NewNodeWithConfig(nodeconf)
- if err != nil {
- return nil, teardown, fmt.Errorf("error creating node: %s", err)
- }
- ids[i] = node.ID()
- addrs[i] = conf.ToAddr(ids[i])
- }
- // set nodes number of Stores available
- stores, storeTeardown, err := SetStores(addrs...)
- teardown = func() {
- net.Shutdown()
- adapterTeardown()
- storeTeardown()
- }
- if err != nil {
- return nil, teardown, err
- }
- s := &Simulation{
- Net: net,
- Stores: stores,
- IDs: ids,
- Addrs: addrs,
- }
- return s, teardown, nil
-}
-
-func (s *Simulation) Run(ctx context.Context, conf *RunConfig) (*simulations.StepResult, error) {
- // bring up nodes, launch the servive
- nodes := conf.NodeCount
- conns := conf.ConnLevel
- for i := 0; i < nodes; i++ {
- if err := s.Net.Start(s.IDs[i]); err != nil {
- return nil, fmt.Errorf("error starting node %s: %s", s.IDs[i].TerminalString(), err)
- }
- }
- // run a simulation which connects the 10 nodes in a chain
- wg := sync.WaitGroup{}
- for i := range s.IDs {
- // collect the overlay addresses, to
- for j := 0; j < conns; j++ {
- var k int
- if j == 0 {
- k = i - 1
- } else {
- k = rand.Intn(len(s.IDs))
- }
- if i > 0 {
- wg.Add(1)
- go func(i, k int) {
- defer wg.Done()
- s.Net.Connect(s.IDs[i], s.IDs[k])
- }(i, k)
- }
- }
- }
- wg.Wait()
- log.Info(fmt.Sprintf("simulation with %v nodes", len(s.Addrs)))
-
- // create an only locally retrieving FileStore for the pivot node to test
- // if retriee requests have arrived
- result := simulations.NewSimulation(s.Net).Run(ctx, conf.Step)
- return result, nil
-}
-
-// WatchDisconnections subscribes to admin peerEvents and sends peer event drop
-// errors to the errc channel. Channel quitC signals the termination of the event loop.
-// Returned doneC will be closed after the rpc subscription is unsubscribed,
-// signaling that simulations network is safe to shutdown.
-func WatchDisconnections(id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}, err error) {
- events := make(chan *p2p.PeerEvent)
- sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
- if err != nil {
- return nil, fmt.Errorf("error getting peer events for node %v: %s", id, err)
- }
- c := make(chan struct{})
- go func() {
- defer func() {
- log.Trace("watch disconnections: unsubscribe", "id", id)
- sub.Unsubscribe()
- close(c)
- }()
- for {
- select {
- case <-quitC:
- return
- case e := <-events:
- if e.Type == p2p.PeerEventTypeDrop {
- select {
- case errc <- fmt.Errorf("peerEvent for node %v: %v", id, e):
- case <-quitC:
- return
- }
- }
- case err := <-sub.Err():
- if err != nil {
- select {
- case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err):
- case <-quitC:
- return
- }
- }
- }
- }
- }()
- return c, nil
-}
-
-func Trigger(d time.Duration, quitC chan struct{}, ids ...discover.NodeID) chan discover.NodeID {
- trigger := make(chan discover.NodeID)
- go func() {
- defer close(trigger)
- ticker := time.NewTicker(d)
- defer ticker.Stop()
- // we are only testing the pivot node (net.Nodes[0])
- for range ticker.C {
- for _, id := range ids {
- select {
- case trigger <- id:
- case <-quitC:
- return
- }
- }
- }
- }()
- return trigger
-}
-
-func (sim *Simulation) CallClient(id discover.NodeID, f func(*rpc.Client) error) error {
- node := sim.Net.GetNode(id)
- if node == nil {
- return fmt.Errorf("unknown node: %s", id)
- }
- client, err := node.Client()
- if err != nil {
- return fmt.Errorf("error getting node client: %s", err)
- }
- return f(client)
-}