aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/fetcher.go70
-rw-r--r--swarm/network/fetcher_test.go125
-rw-r--r--swarm/network/hive_test.go60
-rw-r--r--swarm/network/kademlia.go297
-rw-r--r--swarm/network/kademlia_test.go822
-rw-r--r--swarm/network/networkid_test.go13
-rw-r--r--swarm/network/protocol.go16
-rw-r--r--swarm/network/protocol_test.go26
-rw-r--r--swarm/network/simulation/kademlia.go2
-rw-r--r--swarm/network/simulation/node.go4
-rw-r--r--swarm/network/simulation/node_test.go63
-rw-r--r--swarm/network/simulation/service.go2
-rw-r--r--swarm/network/simulation/simulation_test.go4
-rw-r--r--swarm/network/simulations/discovery/discovery_test.go98
-rw-r--r--swarm/network/simulations/overlay_test.go4
-rw-r--r--swarm/network/stream/common_test.go149
-rw-r--r--swarm/network/stream/delivery.go9
-rw-r--r--swarm/network/stream/delivery_test.go337
-rw-r--r--swarm/network/stream/intervals_test.go58
-rw-r--r--swarm/network/stream/lightnode_test.go16
-rw-r--r--swarm/network/stream/messages.go2
-rw-r--r--swarm/network/stream/norace_test.go24
-rw-r--r--swarm/network/stream/peer.go26
-rw-r--r--swarm/network/stream/race_test.go23
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go69
-rw-r--r--swarm/network/stream/snapshot_sync_test.go107
-rw-r--r--swarm/network/stream/stream.go40
-rw-r--r--swarm/network/stream/streamer_test.go295
-rw-r--r--swarm/network/stream/syncer_test.go161
-rw-r--r--swarm/network/stream/testing/snapshot_4.json1
-rw-r--r--swarm/network/stream/visualized_snapshot_sync_sim_test.go62
31 files changed, 1498 insertions, 1487 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
index 6aed57e22..6b2175166 100644
--- a/swarm/network/fetcher.go
+++ b/swarm/network/fetcher.go
@@ -26,20 +26,23 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
-var searchTimeout = 1 * time.Second
+const (
+ defaultSearchTimeout = 1 * time.Second
+ // maximum number of forwarded requests (hops), to make sure requests are not
+ // forwarded forever in peer loops
+ maxHopCount uint8 = 20
+)
// Time to consider peer to be skipped.
// Also used in stream delivery.
var RequestTimeout = 10 * time.Second
-var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops
-
type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)
// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
// keeps it alive until all active requests are completed. This can happen:
// 1. either because the chunk is delivered
-// 2. or becuse the requestor cancelled/timed out
+// 2. or because the requester cancelled/timed out
// Fetcher self destroys itself after it is completed.
// TODO: cancel all forward requests after termination
type Fetcher struct {
@@ -47,7 +50,9 @@ type Fetcher struct {
addr storage.Address // the address of the chunk to be fetched
offerC chan *enode.ID // channel of sources (peer node id strings)
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
+ searchTimeout time.Duration
skipCheck bool
+ ctx context.Context
}
type Request struct {
@@ -79,7 +84,7 @@ func (r *Request) SkipPeer(nodeID string) bool {
}
t, ok := val.(time.Time)
if ok && time.Now().After(t.Add(RequestTimeout)) {
- // deadine expired
+ // deadline expired
r.peersToSkip.Delete(nodeID)
return false
}
@@ -100,32 +105,35 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
}
}
-// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
-// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
-// this chunk, to make sure we don't request back the chunks from them.
+// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip
+// are not requested to deliver the given chunk. peersToSkip should always
+// contain the peers which are actively requesting this chunk, to make sure we
+// don't request back the chunks from them.
// The created Fetcher is started and returned.
-func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
- fetcher := NewFetcher(source, f.request, f.skipCheck)
- go fetcher.run(ctx, peersToSkip)
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
+ fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
+ go fetcher.run(peers)
return fetcher
}
// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
-func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{
addr: addr,
protoRequestFunc: rf,
offerC: make(chan *enode.ID),
requestC: make(chan uint8),
+ searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
+ ctx: ctx,
}
}
// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
-func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
+func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done
select {
- case <-ctx.Done():
+ case <-f.ctx.Done():
return
default:
}
@@ -134,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.offerC <- source:
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
-func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
+func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
- case <-ctx.Done():
+ case <-f.ctx.Done():
return
default:
}
@@ -156,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.requestC <- hopCount + 1:
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}
// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
-func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
+func (f *Fetcher) run(peers *sync.Map) {
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
@@ -176,7 +184,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// loop that keeps the fetching process alive
// after every request a timer is set. If this goes off we request again from another peer
// note that the previous request is still alive and has the chance to deliver, so
- // rerequesting extends the search. ie.,
+ // requesting again extends the search. ie.,
// if a peer we requested from is gone we issue a new request, so the number of active
// requests never decreases
for {
@@ -209,20 +217,20 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case <-waitC:
- log.Trace("search timed out: rerequesting", "request addr", f.addr)
+ log.Trace("search timed out: requesting", "request addr", f.addr)
doRequest = requested
// all Fetcher context closed, can quit
- case <-ctx.Done():
+ case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
- // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
+ // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
}
// need to issue a new request
if doRequest {
var err error
- sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
+ sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
@@ -231,7 +239,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// if wait channel is not set, set it to a timer
if requested {
if wait == nil {
- wait = time.NewTimer(searchTimeout)
+ wait = time.NewTimer(f.searchTimeout)
defer wait.Stop()
waitC = wait.C
} else {
@@ -242,8 +250,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
default:
}
}
- // reset the timer to go off after searchTimeout
- wait.Reset(searchTimeout)
+ // reset the timer to go off after defaultSearchTimeout
+ wait.Reset(f.searchTimeout)
}
}
doRequest = false
@@ -260,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
-func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
+func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
@@ -277,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
- sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
@@ -291,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
if !foundSource {
req.Source = nil
var err error
- sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil {
// if no peers found to request from
return sources, err
@@ -308,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
select {
case <-quit:
gone <- sourceID
- case <-ctx.Done():
+ case <-f.ctx.Done():
}
}()
return sources, nil
diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go
index 3a926f475..4e464f10f 100644
--- a/swarm/network/fetcher_test.go
+++ b/swarm/network/fetcher_test.go
@@ -69,7 +69,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
func TestFetcherSingleRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
peers := []string{"a", "b", "c", "d"}
peersToSkip := &sync.Map{}
@@ -77,13 +81,9 @@ func TestFetcherSingleRequest(t *testing.T) {
peersToSkip.Store(p, time.Now())
}
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ go fetcher.run(peersToSkip)
- go fetcher.run(ctx, peersToSkip)
-
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
select {
case request := <-requester.requestC:
@@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
func TestFetcherCancelStopsFetcher(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
// we start the fetcher, and then we immediately cancel the context
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
cancel()
- rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
- defer rcancel()
// we call Request with an active context
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
@@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
func TestFetcherCancelStopsRequest(t *testing.T) {
+ t.Skip("since context is now per fetcher, this test is likely redundant")
+
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- // we start the fetcher with an active context
- go fetcher.run(ctx, peersToSkip)
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
- rctx, rcancel := context.WithCancel(context.Background())
- rcancel()
+ peersToSkip := &sync.Map{}
+
+ // we start the fetcher with an active context
+ go fetcher.run(peersToSkip)
// we call Request with a cancelled context
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
@@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
}
// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
- rctx = context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
select {
case <-requester.requestC:
@@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
func TestFetcherOfferUsesSource(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
// start the fetcher
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
- rctx := context.Background()
// call the Offer function with the source peer
- fetcher.Offer(rctx, &sourcePeerID)
+ fetcher.Offer(&sourcePeerID)
// fetcher should not initiate request
select {
@@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
}
// call Request after the Offer
- rctx = context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// there should be exactly 1 request coming from fetcher
var request *Request
@@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
// start the fetcher
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
// call Request first
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// there should be a request coming from fetcher
var request *Request
@@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
}
// after the Request call Offer
- fetcher.Offer(context.Background(), &sourcePeerID)
+ fetcher.Offer(&sourcePeerID)
// there should be a request coming from fetcher
select {
@@ -283,25 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
func TestFetcherRetryOnTimeout(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
- peersToSkip := &sync.Map{}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// set searchTimeOut to low value so the test is quicker
- defer func(t time.Duration) {
- searchTimeout = t
- }(searchTimeout)
- searchTimeout = 250 * time.Millisecond
+ fetcher.searchTimeout = 250 * time.Millisecond
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ peersToSkip := &sync.Map{}
// start the fetcher
- go fetcher.run(ctx, peersToSkip)
+ go fetcher.run(peersToSkip)
// call the fetch function with an active context
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ fetcher.Request(0)
// after 100ms the first request should be initiated
time.Sleep(100 * time.Millisecond)
@@ -343,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {
fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
- fetcher.Request(context.Background(), 0)
+ fetcher.Request(0)
// check if the created fetchFunction really starts a fetcher and initiates a request
select {
@@ -357,23 +350,21 @@ func TestFetcherFactory(t *testing.T) {
func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
-
- // make sure searchTimeout is long so it is sure the request is not retried because of timeout
- defer func(t time.Duration) {
- searchTimeout = t
- }(searchTimeout)
- searchTimeout = 10 * time.Second
-
- peersToSkip := &sync.Map{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- go fetcher.run(ctx, peersToSkip)
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
- rctx := context.Background()
- fetcher.Request(rctx, 0)
+ // make sure the searchTimeout is long so it is sure the request is not
+ // retried because of timeout
+ fetcher.searchTimeout = 10 * time.Second
+
+ peersToSkip := &sync.Map{}
+
+ go fetcher.run(peersToSkip)
+
+ fetcher.Request(0)
select {
case <-requester.requestC:
@@ -466,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
func TestFetcherMaxHopCount(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
- fetcher := NewFetcher(addr, requester.doRequest, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- peersToSkip := &sync.Map{}
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
- go fetcher.run(ctx, peersToSkip)
+ peersToSkip := &sync.Map{}
- rctx := context.Background()
- fetcher.Request(rctx, maxHopCount)
+ go fetcher.run(peersToSkip)
// if hopCount is already at max no request should be initiated
select {
diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go
index a29e73083..fea4347ea 100644
--- a/swarm/network/hive_test.go
+++ b/swarm/network/hive_test.go
@@ -18,9 +18,9 @@ package network
import (
"io/ioutil"
- "log"
"os"
"testing"
+ "time"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/state"
@@ -35,6 +35,8 @@ func newHiveTester(t *testing.T, params *HiveParams, n int, store state.Store) (
return newBzzBaseTester(t, n, addr, DiscoverySpec, pp.Run), pp
}
+// TestRegisterAndConnect verifies that the protocol runs successfully
+// and that the peer connection exists afterwards
func TestRegisterAndConnect(t *testing.T) {
params := NewHiveParams()
s, pp := newHiveTester(t, params, 1, nil)
@@ -43,25 +45,57 @@ func TestRegisterAndConnect(t *testing.T) {
raddr := NewAddr(node)
pp.Register(raddr)
- // start the hive and wait for the connection
+ // start the hive
err := pp.Start(s.Server)
if err != nil {
t.Fatal(err)
}
defer pp.Stop()
- // retrieve and broadcast
+
+ // both hive connect and disconect check have time delays
+ // therefore we need to verify that peer is connected
+ // so that we are sure that the disconnect timeout doesn't complete
+ // before the hive connect method is run at least once
+ timeout := time.After(time.Second)
+ for {
+ select {
+ case <-timeout:
+ t.Fatalf("expected connection")
+ default:
+ }
+ i := 0
+ pp.Kademlia.EachConn(nil, 256, func(addr *Peer, po int) bool {
+ i++
+ return true
+ })
+ if i > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
+
+ // check that the connection actually exists
+ // the timeout error means no disconnection events
+ // were received within the a certain timeout
err = s.TestDisconnected(&p2ptest.Disconnect{
Peer: s.Nodes[0].ID(),
Error: nil,
})
if err == nil || err.Error() != "timed out waiting for peers to disconnect" {
- t.Fatalf("expected peer to connect")
+ t.Fatalf("expected no disconnection event")
}
}
+// TestHiveStatePersistance creates a protocol simulation with n peers for a node
+// After protocols complete, the node is shut down and the state is stored.
+// Another simulation is created, where 0 nodes are created, but where the stored state is passed
+// The test succeeds if all the peers from the stored state are known after the protocols of the
+// second simulation have completed
+//
+// Actual connectivity is not in scope for this test, as the peers loaded from state are not known to
+// the simulation; the test only verifies that the peers are known to the node
func TestHiveStatePersistance(t *testing.T) {
- log.SetOutput(os.Stdout)
dir, err := ioutil.TempDir("", "hive_test_store")
if err != nil {
@@ -84,7 +118,8 @@ func TestHiveStatePersistance(t *testing.T) {
peers[raddr.String()] = true
}
- // start the hive and wait for the connection
+ // start and stop the hive
+ // the known peers should be saved upon stopping
err = pp.Start(s.Server)
if err != nil {
t.Fatal(err)
@@ -92,15 +127,15 @@ func TestHiveStatePersistance(t *testing.T) {
pp.Stop()
store.Close()
- persistedStore, err := state.NewDBStore(dir) //start the hive with an empty dbstore
+ // start the hive with an empty dbstore
+ persistedStore, err := state.NewDBStore(dir)
if err != nil {
t.Fatal(err)
}
- s1, pp := newHiveTester(t, params, 1, persistedStore)
-
- //start the hive and wait for the connection
+ s1, pp := newHiveTester(t, params, 0, persistedStore)
+ // start the hive and check that we know of all expected peers
pp.Start(s1.Server)
i := 0
pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int) bool {
@@ -108,10 +143,13 @@ func TestHiveStatePersistance(t *testing.T) {
i++
return true
})
+ // TODO remove this line when verified that test passes
+ time.Sleep(time.Second)
if i != 5 {
- t.Errorf("invalid number of entries: got %v, want %v", i, 5)
+ t.Fatalf("invalid number of entries: got %v, want %v", i, 5)
}
if len(peers) != 0 {
t.Fatalf("%d peers left over: %v", len(peers), peers)
}
+
}
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index 7d52f26f7..146f39106 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/pot"
+ sv "github.com/ethereum/go-ethereum/swarm/version"
)
/*
@@ -168,82 +169,115 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error {
return nil
}
-// SuggestPeer returns a known peer for the lowest proximity bin for the
-// lowest bincount below depth
-// naturally if there is an empty row it returns a peer for that
-func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) {
+// SuggestPeer returns an unconnected peer address as a peer suggestion for connection
+func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) {
k.lock.Lock()
defer k.lock.Unlock()
- minsize := k.MinBinSize
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- // if there is a callable neighbour within the current proxBin, connect
- // this makes sure nearest neighbour set is fully connected
- var ppo int
- k.addrs.EachNeighbour(k.base, Pof, func(val pot.Val, po int) bool {
- if po < depth {
- return false
+ radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
+ // collect undersaturated bins in ascending order of number of connected peers
+ // and from shallow to deep (ascending order of PO)
+ // insert them in a map of bin arrays, keyed with the number of connected peers
+ saturation := make(map[int][]int)
+ var lastPO int // the last non-empty PO bin in the iteration
+ saturationDepth = -1 // the deepest PO such that all shallower bins have >= k.MinBinSize peers
+ var pastDepth bool // whether po of iteration >= depth
+ k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
+ // process skipped empty bins
+ for ; lastPO < po; lastPO++ {
+ // find the lowest unsaturated bin
+ if saturationDepth == -1 {
+ saturationDepth = lastPO
+ }
+ // if there is an empty bin, depth is surely passed
+ pastDepth = true
+ saturation[0] = append(saturation[0], lastPO)
}
- e := val.(*entry)
- c := k.callable(e)
- if c {
- a = e.BzzAddr
+ lastPO = po + 1
+ // past radius, depth is surely passed
+ if po >= radius {
+ pastDepth = true
}
- ppo = po
- return !c
- })
- if a != nil {
- log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo))
- return a, 0, false
- }
-
- var bpo []int
- prev := -1
- k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
- prev++
- for ; prev < po; prev++ {
- bpo = append(bpo, prev)
- minsize = 0
+ // beyond depth the bin is treated as unsaturated even if size >= k.MinBinSize
+ // in order to achieve full connectivity to all neighbours
+ if pastDepth && size >= k.MinBinSize {
+ size = k.MinBinSize - 1
}
- if size < minsize {
- bpo = append(bpo, po)
- minsize = size
+ // process non-empty unsaturated bins
+ if size < k.MinBinSize {
+ // find the lowest unsaturated bin
+ if saturationDepth == -1 {
+ saturationDepth = po
+ }
+ saturation[size] = append(saturation[size], po)
}
- return size > 0 && po < depth
+ return true
+ })
+ // to trigger peer requests for peers closer than closest connection, include
+ // all bins from nearest connection upto nearest address as unsaturated
+ var nearestAddrAt int
+ k.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool {
+ nearestAddrAt = po
+ return false
})
- // all buckets are full, ie., minsize == k.MinBinSize
- if len(bpo) == 0 {
+ // including bins as size 0 has the effect that requesting connection
+ // is prioritised over non-empty shallower bins
+ for ; lastPO <= nearestAddrAt; lastPO++ {
+ saturation[0] = append(saturation[0], lastPO)
+ }
+ // all PO bins are saturated, ie., minsize >= k.MinBinSize, no peer suggested
+ if len(saturation) == 0 {
return nil, 0, false
}
- // as long as we got candidate peers to connect to
- // dont ask for new peers (want = false)
- // try to select a candidate peer
- // find the first callable peer
- nxt := bpo[0]
- k.addrs.EachBin(k.base, Pof, nxt, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
- // for each bin (up until depth) we find callable candidate peers
- if po >= depth {
- return false
+ // find the first callable peer in the address book
+ // starting from the bins with smallest size proceeding from shallow to deep
+ // for each bin (up until neighbourhood radius) we find callable candidate peers
+ for size := 0; size < k.MinBinSize && suggestedPeer == nil; size++ {
+ bins, ok := saturation[size]
+ if !ok {
+ // no bin with this size
+ continue
}
- return f(func(val pot.Val) bool {
- e := val.(*entry)
- c := k.callable(e)
- if c {
- a = e.BzzAddr
+ cur := 0
+ curPO := bins[0]
+ k.addrs.EachBin(k.base, Pof, curPO, func(po, _ int, f func(func(pot.Val) bool) bool) bool {
+ curPO = bins[cur]
+ // find the next bin that has size size
+ if curPO == po {
+ cur++
+ } else {
+ // skip bins that have no addresses
+ for ; cur < len(bins) && curPO < po; cur++ {
+ curPO = bins[cur]
+ }
+ if po < curPO {
+ cur--
+ return true
+ }
+ // stop if there are no addresses
+ if curPO < po {
+ return false
+ }
}
- return !c
+ // curPO found
+ // find a callable peer out of the addresses in the unsaturated bin
+ // stop if found
+ f(func(val pot.Val) bool {
+ e := val.(*entry)
+ if k.callable(e) {
+ suggestedPeer = e.BzzAddr
+ return false
+ }
+ return true
+ })
+ return cur < len(bins) && suggestedPeer == nil
})
- })
- // found a candidate
- if a != nil {
- return a, 0, false
}
- // no candidate peer found, request for the short bin
- var changed bool
- if uint8(nxt) < k.depth {
- k.depth = uint8(nxt)
- changed = true
+
+ if uint8(saturationDepth) < k.depth {
+ k.depth = uint8(saturationDepth)
+ return suggestedPeer, saturationDepth, true
}
- return a, nxt, changed
+ return suggestedPeer, 0, false
}
// On inserts the peer as a kademlia peer into the live peers
@@ -319,6 +353,9 @@ func (k *Kademlia) sendNeighbourhoodDepthChange() {
// Not receiving from the returned channel will block Register function
// when address count value changes.
func (k *Kademlia) AddrCountC() <-chan int {
+ k.lock.Lock()
+ defer k.lock.Unlock()
+
if k.addrCountC == nil {
k.addrCountC = make(chan int)
}
@@ -398,29 +435,25 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
})
}
+// NeighbourhoodDepth returns the depth for the pot, see depthForPot
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
k.lock.RLock()
defer k.lock.RUnlock()
return depthForPot(k.conns, k.NeighbourhoodSize, k.base)
}
-// depthForPot returns the proximity order that defines the distance of
-// the nearest neighbour set with cardinality >= NeighbourhoodSize
-// if there is altogether less than NeighbourhoodSize peers it returns 0
+// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia
+// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize
+// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether
+// contain at least neighbourhoodSize connected peers
+// if there is altogether less than neighbourhoodSize peers connected, it returns 0
// caller must hold the lock
-func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
+func neighbourhoodRadiusForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
if p.Size() <= neighbourhoodSize {
return 0
}
-
// total number of peers in iteration
var size int
-
- // determining the depth is a two-step process
- // first we find the proximity bin of the shallowest of the NeighbourhoodSize peers
- // the numeric value of depth cannot be higher than this
- var maxDepth int
-
f := func(v pot.Val, i int) bool {
// po == 256 means that addr is the pivot address(self)
if i == 256 {
@@ -431,13 +464,30 @@ func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int
// this means we have all nn-peers.
// depth is by default set to the bin of the farthest nn-peer
if size == neighbourhoodSize {
- maxDepth = i
+ depth = i
return false
}
return true
}
p.EachNeighbour(pivotAddr, Pof, f)
+ return depth
+}
+
+// depthForPot returns the depth for the pot
+// depth is the radius of the minimal extension of nearest neighbourhood that
+// includes all empty PO bins. I.e., depth is the deepest PO such that
+// - it is not deeper than neighbourhood radius
+// - all bins shallower than depth are not empty
+// caller must hold the lock
+func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) {
+ if p.Size() <= neighbourhoodSize {
+ return 0
+ }
+ // determining the depth is a two-step process
+ // first we find the proximity bin of the shallowest of the neighbourhoodSize peers
+ // the numeric value of depth cannot be higher than this
+ maxDepth := neighbourhoodRadiusForPot(p, neighbourhoodSize, pivotAddr)
// the second step is to test for empty bins in order from shallowest to deepest
// if an empty bin is found, this will be the actual depth
@@ -506,6 +556,9 @@ func (k *Kademlia) string() string {
var rows []string
rows = append(rows, "=========================================================================")
+ if len(sv.GitCommit) > 0 {
+ rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit))
+ }
rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3]))
rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize))
@@ -575,7 +628,8 @@ func (k *Kademlia) string() string {
// used for testing only
// TODO move to separate testing tools file
type PeerPot struct {
- NNSet [][]byte
+ NNSet [][]byte
+ PeersPerBin []int
}
// NewPeerPotMap creates a map of pot record of *BzzAddr with keys
@@ -601,6 +655,7 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
// all nn-peers
var nns [][]byte
+ peersPerBin := make([]int, depth)
// iterate through the neighbours, going from the deepest to the shallowest
np.EachNeighbour(a, Pof, func(val pot.Val, po int) bool {
@@ -614,38 +669,74 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
// a neighbor is any peer in or deeper than the depth
if po >= depth {
nns = append(nns, addr)
- return true
+ } else {
+ // for peers < depth, we just count the number in each bin
+ // the bin is the index of the slice
+ peersPerBin[po]++
}
- return false
+ return true
})
- log.Trace(fmt.Sprintf("%x PeerPotMap NNS: %s", addrs[i][:4], LogAddrs(nns)))
+ log.Trace(fmt.Sprintf("%x PeerPotMap NNS: %s, peersPerBin", addrs[i][:4], LogAddrs(nns)))
ppmap[common.Bytes2Hex(a)] = &PeerPot{
- NNSet: nns,
+ NNSet: nns,
+ PeersPerBin: peersPerBin,
}
}
return ppmap
}
-// saturation iterates through all peers and
-// returns the smallest po value in which the node has less than n peers
-// if the iterator reaches depth, then value for depth is returned
-// TODO move to separate testing tools file
-// TODO this function will stop at the first bin with less than MinBinSize peers, even if there are empty bins between that bin and the depth. This may not be correct behavior
+// saturation returns the smallest po value in which the node has less than MinBinSize peers
+// if the iterator reaches neighbourhood radius, then the last bin + 1 is returned
func (k *Kademlia) saturation() int {
prev := -1
- k.addrs.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
+ radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)
+ k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
prev++
+ if po >= radius {
+ return false
+ }
return prev == po && size >= k.MinBinSize
})
- // TODO evaluate whether this check cannot just as well be done within the eachbin
- depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- if depth < prev {
- return depth
+ if prev < 0 {
+ return 0
}
return prev
}
+// isSaturated returns true if the kademlia is considered saturated, or false if not.
+// It checks this by checking an array of ints called unsaturatedBins; each item in that array corresponds
+// to the bin which is unsaturated (number of connections < k.MinBinSize).
+// The bin is considered unsaturated only if there are actual peers in that PeerPot's bin (peersPerBin)
+// (if there is no peer for a given bin, then no connection could ever be established;
+// in a God's view this is relevant as no more peers will ever appear on that bin)
+func (k *Kademlia) isSaturated(peersPerBin []int, depth int) bool {
+ // depth could be calculated from k but as this is called from `GetHealthInfo()`,
+ // the depth has already been calculated so we can require it as a parameter
+
+ // early check for depth
+ if depth != len(peersPerBin) {
+ return false
+ }
+ unsaturatedBins := make([]int, 0)
+ k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool {
+
+ if po >= depth {
+ return false
+ }
+ log.Trace("peers per bin", "peersPerBin[po]", peersPerBin[po], "po", po)
+ // if there are actually peers in the PeerPot who can fulfill k.MinBinSize
+ if size < k.MinBinSize && size < peersPerBin[po] {
+ log.Trace("connections for po", "po", po, "size", size)
+ unsaturatedBins = append(unsaturatedBins, po)
+ }
+ return true
+ })
+
+ log.Trace("list of unsaturated bins", "unsaturatedBins", unsaturatedBins)
+ return len(unsaturatedBins) == 0
+}
+
// knowNeighbours tests if all neighbours in the peerpot
// are found among the peers known to the kademlia
// It is used in Healthy function for testing only
@@ -728,11 +819,13 @@ type Health struct {
ConnectNN bool // whether node is connected to all its neighbours
CountConnectNN int // amount of neighbours connected to
MissingConnectNN [][]byte // which neighbours we should have been connected to but we're not
- Saturated bool // whether we are connected to all the peers we would have liked to
- Hive string
+ // Saturated: if in all bins < depth number of connections >= MinBinsize or,
+ // if number of connections < MinBinSize, to the number of available peers in that bin
+ Saturated bool
+ Hive string
}
-// Healthy reports the health state of the kademlia connectivity
+// GetHealthInfo reports the health state of the kademlia connectivity
//
// The PeerPot argument provides an all-knowing view of the network
// The resulting Health object is a result of comparisons between
@@ -740,13 +833,19 @@ type Health struct {
// what SHOULD it have been when we take all we know about the network into consideration.
//
// used for testing only
-func (k *Kademlia) Healthy(pp *PeerPot) *Health {
+func (k *Kademlia) GetHealthInfo(pp *PeerPot) *Health {
k.lock.RLock()
defer k.lock.RUnlock()
+ if len(pp.NNSet) < k.NeighbourhoodSize {
+ log.Warn("peerpot NNSet < NeighbourhoodSize")
+ }
gotnn, countgotnn, culpritsgotnn := k.connectedNeighbours(pp.NNSet)
knownn, countknownn, culpritsknownn := k.knowNeighbours(pp.NNSet)
depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
- saturated := k.saturation() < depth
+
+ // check saturation
+ saturated := k.isSaturated(pp.PeersPerBin, depth)
+
log.Trace(fmt.Sprintf("%08x: healthy: knowNNs: %v, gotNNs: %v, saturated: %v\n", k.base, knownn, gotnn, saturated))
return &Health{
KnowNN: knownn,
@@ -759,3 +858,13 @@ func (k *Kademlia) Healthy(pp *PeerPot) *Health {
Hive: k.string(),
}
}
+
+// Healthy return the strict interpretation of `Healthy` given a `Health` struct
+// definition of strict health: all conditions must be true:
+// - we at least know one peer
+// - we know all neighbors
+// - we are connected to all known neighbors
+// - it is saturated
+func (h *Health) Healthy() bool {
+ return h.KnowNN && h.ConnectNN && h.CountKnowNN > 0 && h.Saturated
+}
diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go
index fcb277fde..b4663eee5 100644
--- a/swarm/network/kademlia_test.go
+++ b/swarm/network/kademlia_test.go
@@ -17,7 +17,6 @@
package network
import (
- "bytes"
"fmt"
"os"
"testing"
@@ -43,39 +42,46 @@ func testKadPeerAddr(s string) *BzzAddr {
func newTestKademliaParams() *KadParams {
params := NewKadParams()
- // TODO why is this 1?
- params.MinBinSize = 1
+ params.MinBinSize = 2
params.NeighbourhoodSize = 2
return params
}
-func newTestKademlia(b string) *Kademlia {
+type testKademlia struct {
+ *Kademlia
+ t *testing.T
+}
+
+func newTestKademlia(t *testing.T, b string) *testKademlia {
base := pot.NewAddressFromString(b)
- return NewKademlia(base, newTestKademliaParams())
+ return &testKademlia{
+ Kademlia: NewKademlia(base, newTestKademliaParams()),
+ t: t,
+ }
}
-func newTestKadPeer(k *Kademlia, s string, lightNode bool) *Peer {
- return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s), LightNode: lightNode}, k)
+func (tk *testKademlia) newTestKadPeer(s string, lightNode bool) *Peer {
+ return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s), LightNode: lightNode}, tk.Kademlia)
}
-func On(k *Kademlia, ons ...string) {
+func (tk *testKademlia) On(ons ...string) {
for _, s := range ons {
- k.On(newTestKadPeer(k, s, false))
+ tk.Kademlia.On(tk.newTestKadPeer(s, false))
}
}
-func Off(k *Kademlia, offs ...string) {
+func (tk *testKademlia) Off(offs ...string) {
for _, s := range offs {
- k.Off(newTestKadPeer(k, s, false))
+ tk.Kademlia.Off(tk.newTestKadPeer(s, false))
}
}
-func Register(k *Kademlia, regs ...string) {
+func (tk *testKademlia) Register(regs ...string) {
var as []*BzzAddr
for _, s := range regs {
as = append(as, testKadPeerAddr(s))
}
- err := k.Register(as...)
+ err := tk.Kademlia.Register(as...)
if err != nil {
panic(err.Error())
}
@@ -162,6 +168,46 @@ func TestNeighbourhoodDepth(t *testing.T) {
testNum++
}
+// TestHighMinBinSize tests that the saturation function also works
+// if MinBinSize is > 2, the connection count is < k.MinBinSize
+// and there are more peers available than connected
+func TestHighMinBinSize(t *testing.T) {
+ // a function to test for different MinBinSize values
+ testKad := func(minBinSize int) {
+ // create a test kademlia
+ tk := newTestKademlia(t, "11111111")
+ // set its MinBinSize to desired value
+ tk.KadParams.MinBinSize = minBinSize
+
+ // add a couple of peers (so we have NN and depth)
+ tk.On("00000000") // bin 0
+ tk.On("11100000") // bin 3
+ tk.On("11110000") // bin 4
+
+ first := "10000000" // add a first peer at bin 1
+ tk.Register(first) // register it
+ // we now have one registered peer at bin 1;
+ // iterate and connect one peer at each iteration;
+ // should be unhealthy until at minBinSize - 1
+ // we connect the unconnected but registered peer
+ for i := 1; i < minBinSize; i++ {
+ peer := fmt.Sprintf("1000%b", 8|i)
+ tk.On(peer)
+ if i == minBinSize-1 {
+ tk.On(first)
+ tk.checkHealth(true)
+ return
+ }
+ tk.checkHealth(false)
+ }
+ }
+ // test MinBinSizes of 3 to 5
+ testMinBinSizes := []int{3, 4, 5}
+ for _, k := range testMinBinSizes {
+ testKad(k)
+ }
+}
+
// TestHealthStrict tests the simplest definition of health
// Which means whether we are connected to all neighbors we know of
func TestHealthStrict(t *testing.T) {
@@ -169,100 +215,151 @@ func TestHealthStrict(t *testing.T) {
// base address is all zeros
// no peers
// unhealthy (and lonely)
- k := newTestKademlia("11111111")
- assertHealth(t, k, false, false)
+ tk := newTestKademlia(t, "11111111")
+ tk.checkHealth(false)
// know one peer but not connected
// unhealthy
- Register(k, "11100000")
- log.Trace(k.String())
- assertHealth(t, k, false, false)
+ tk.Register("11100000")
+ tk.checkHealth(false)
// know one peer and connected
- // healthy
- On(k, "11100000")
- assertHealth(t, k, true, false)
+ // unhealthy: not saturated
+ tk.On("11100000")
+ tk.checkHealth(true)
// know two peers, only one connected
// unhealthy
- Register(k, "11111100")
- log.Trace(k.String())
- assertHealth(t, k, false, false)
+ tk.Register("11111100")
+ tk.checkHealth(false)
// know two peers and connected to both
// healthy
- On(k, "11111100")
- assertHealth(t, k, true, false)
+ tk.On("11111100")
+ tk.checkHealth(true)
// know three peers, connected to the two deepest
// healthy
- Register(k, "00000000")
- log.Trace(k.String())
- assertHealth(t, k, true, false)
+ tk.Register("00000000")
+ tk.checkHealth(false)
// know three peers, connected to all three
// healthy
- On(k, "00000000")
- assertHealth(t, k, true, false)
+ tk.On("00000000")
+ tk.checkHealth(true)
// add fourth peer deeper than current depth
// unhealthy
- Register(k, "11110000")
- log.Trace(k.String())
- assertHealth(t, k, false, false)
+ tk.Register("11110000")
+ tk.checkHealth(false)
// connected to three deepest peers
// healthy
- On(k, "11110000")
- assertHealth(t, k, true, false)
+ tk.On("11110000")
+ tk.checkHealth(true)
// add additional peer in same bin as deepest peer
// unhealthy
- Register(k, "11111101")
- log.Trace(k.String())
- assertHealth(t, k, false, false)
+ tk.Register("11111101")
+ tk.checkHealth(false)
// four deepest of five peers connected
// healthy
- On(k, "11111101")
- assertHealth(t, k, true, false)
+ tk.On("11111101")
+ tk.checkHealth(true)
+
+ // add additional peer in bin 0
+ // unhealthy: unsaturated bin 0, 2 known but 1 connected
+ tk.Register("00000001")
+ tk.checkHealth(false)
+
+ // Connect second in bin 0
+ // healthy
+ tk.On("00000001")
+ tk.checkHealth(true)
+
+ // add peer in bin 1
+ // unhealthy, as it is known but not connected
+ tk.Register("10000000")
+ tk.checkHealth(false)
+
+ // connect peer in bin 1
+ // depth change, is now 1
+ // healthy, 1 peer in bin 1 known and connected
+ tk.On("10000000")
+ tk.checkHealth(true)
+
+ // add second peer in bin 1
+ // unhealthy, as it is known but not connected
+ tk.Register("10000001")
+ tk.checkHealth(false)
+
+ // connect second peer in bin 1
+ // healthy,
+ tk.On("10000001")
+ tk.checkHealth(true)
+
+ // connect third peer in bin 1
+ // healthy,
+ tk.On("10000011")
+ tk.checkHealth(true)
+
+ // add peer in bin 2
+ // unhealthy, no depth change
+ tk.Register("11000000")
+ tk.checkHealth(false)
+
+ // connect peer in bin 2
+ // depth change - as we already have peers in bin 3 and 4,
+ // we have contiguous bins, no bin < po 5 is empty -> depth 5
+ // healthy, every bin < depth has the max available peers,
+ // even if they are < MinBinSize
+ tk.On("11000000")
+ tk.checkHealth(true)
+
+ // add peer in bin 2
+ // unhealthy, peer bin is below depth 5 but
+ // has more available peers (2) than connected ones (1)
+ // --> unsaturated
+ tk.Register("11000011")
+ tk.checkHealth(false)
}
-func assertHealth(t *testing.T, k *Kademlia, expectHealthy bool, expectSaturation bool) {
- t.Helper()
- kid := common.Bytes2Hex(k.BaseAddr())
- addrs := [][]byte{k.BaseAddr()}
- k.EachAddr(nil, 255, func(addr *BzzAddr, po int) bool {
+func (tk *testKademlia) checkHealth(expectHealthy bool) {
+ tk.t.Helper()
+ kid := common.Bytes2Hex(tk.BaseAddr())
+ addrs := [][]byte{tk.BaseAddr()}
+ tk.EachAddr(nil, 255, func(addr *BzzAddr, po int) bool {
addrs = append(addrs, addr.Address())
return true
})
- pp := NewPeerPotMap(k.NeighbourhoodSize, addrs)
- healthParams := k.Healthy(pp[kid])
+ pp := NewPeerPotMap(tk.NeighbourhoodSize, addrs)
+ healthParams := tk.GetHealthInfo(pp[kid])
// definition of health, all conditions but be true:
// - we at least know one peer
// - we know all neighbors
// - we are connected to all known neighbors
- health := healthParams.KnowNN && healthParams.ConnectNN && healthParams.CountKnowNN > 0
+ health := healthParams.Healthy()
if expectHealthy != health {
- t.Fatalf("expected kademlia health %v, is %v\n%v", expectHealthy, health, k.String())
+ tk.t.Fatalf("expected kademlia health %v, is %v\n%v", expectHealthy, health, tk.String())
}
}
-func testSuggestPeer(k *Kademlia, expAddr string, expPo int, expWant bool) error {
- addr, o, want := k.SuggestPeer()
- log.Trace("suggestpeer return", "a", addr, "o", o, "want", want)
+func (tk *testKademlia) checkSuggestPeer(expAddr string, expDepth int, expChanged bool) {
+ tk.t.Helper()
+ addr, depth, changed := tk.SuggestPeer()
+ log.Trace("suggestPeer return", "addr", addr, "depth", depth, "changed", changed)
if binStr(addr) != expAddr {
- return fmt.Errorf("incorrect peer address suggested. expected %v, got %v", expAddr, binStr(addr))
+ tk.t.Fatalf("incorrect peer address suggested. expected %v, got %v", expAddr, binStr(addr))
}
- if o != expPo {
- return fmt.Errorf("incorrect prox order suggested. expected %v, got %v", expPo, o)
+ if depth != expDepth {
+ tk.t.Fatalf("incorrect saturation depth suggested. expected %v, got %v", expDepth, depth)
}
- if want != expWant {
- return fmt.Errorf("expected SuggestPeer to want peers: %v", expWant)
+ if changed != expChanged {
+ tk.t.Fatalf("expected depth change = %v, got %v", expChanged, changed)
}
- return nil
}
func binStr(a *BzzAddr) string {
@@ -272,609 +369,184 @@ func binStr(a *BzzAddr) string {
return pot.ToBin(a.Address())[:8]
}
-// TODO explain why this bug occurred and how it should have been mitigated
-func TestSuggestPeerBug(t *testing.T) {
- // 2 row gap, unsaturated proxbin, no callables -> want PO 0
- k := newTestKademlia("00000000")
- On(k,
- "10000000", "11000000",
- "01000000",
-
- "00010000", "00011000",
- )
- Off(k,
- "01000000",
- )
- err := testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
-}
-
func TestSuggestPeerFindPeers(t *testing.T) {
- t.Skip("The SuggestPeers implementation seems to have weaknesses exposed by the change in the new depth calculation. The results are no longer predictable")
-
- testnum := 0
- // test 0
- // 2 row gap, unsaturated proxbin, no callables -> want PO 0
- k := newTestKademlia("00000000")
- On(k, "00100000")
- err := testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk := newTestKademlia(t, "00000000")
+ tk.On("00100000")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 1
- // 2 row gap, saturated proxbin, no callables -> want PO 0
- On(k, "00010000")
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("00010000")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 2
- // 1 row gap (1 less), saturated proxbin, no callables -> want PO 1
- On(k, "10000000")
- err = testSuggestPeer(k, "<nil>", 1, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
-
- // test 3
- // no gap (1 less), saturated proxbin, no callables -> do not want more
- On(k, "01000000", "00100001")
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("10000000", "10000001")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 4
- // oversaturated proxbin, > do not want more
- On(k, "00100001")
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("01000000")
+ tk.Off("10000001")
+ tk.checkSuggestPeer("10000001", 0, true)
- // test 5
- // reintroduce gap, disconnected peer callable
- Off(k, "01000000")
- log.Trace(k.String())
- err = testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("00100001")
+ tk.Off("01000000")
+ tk.checkSuggestPeer("01000000", 0, false)
- // test 6
// second time disconnected peer not callable
// with reasonably set Interval
- log.Trace("foo")
- log.Trace(k.String())
- err = testSuggestPeer(k, "<nil>", 1, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 6
// on and off again, peer callable again
- On(k, "01000000")
- Off(k, "01000000")
- log.Trace(k.String())
- err = testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("01000000")
+ tk.Off("01000000")
+ tk.checkSuggestPeer("01000000", 0, false)
- // test 7
- // new closer peer appears, it is immediately wanted
- On(k, "01000000")
- Register(k, "00010001")
- err = testSuggestPeer(k, "00010001", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
-
- // test 8
- // PO1 disconnects
- On(k, "00010001")
- log.Info(k.String())
- Off(k, "01000000")
- log.Info(k.String())
- // second time, gap filling
- err = testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("01000000", "10000001")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 9
- On(k, "01000000")
- log.Info(k.String())
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.Register("00010001")
+ tk.checkSuggestPeer("00010001", 0, false)
- // test 10
- k.MinBinSize = 2
- log.Info(k.String())
- err = testSuggestPeer(k, "<nil>", 0, true)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("00010001")
+ tk.Off("01000000")
+ tk.checkSuggestPeer("01000000", 0, false)
- // test 11
- Register(k, "01000001")
- log.Info(k.String())
- err = testSuggestPeer(k, "01000001", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("01000000")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 12
- On(k, "10000001")
- log.Trace(fmt.Sprintf("Kad:\n%v", k.String()))
- err = testSuggestPeer(k, "<nil>", 1, true)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.Register("01000001")
+ tk.checkSuggestPeer("01000001", 0, false)
- // test 13
- On(k, "01000001")
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.On("01000001")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 14
- k.MinBinSize = 3
- Register(k, "10000010")
- err = testSuggestPeer(k, "10000010", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.Register("10000010", "01000010", "00100010")
+ tk.checkSuggestPeer("<nil>", 0, false)
- // test 15
- On(k, "10000010")
- err = testSuggestPeer(k, "<nil>", 1, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.Register("00010010")
+ tk.checkSuggestPeer("00010010", 0, false)
- // test 16
- On(k, "01000010")
- err = testSuggestPeer(k, "<nil>", 2, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.Off("00100001")
+ tk.checkSuggestPeer("00100010", 2, true)
- // test 17
- On(k, "00100010")
- err = testSuggestPeer(k, "<nil>", 3, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.Off("01000001")
+ tk.checkSuggestPeer("01000010", 1, true)
- // test 18
- On(k, "00010010")
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatalf("%d %v", testnum, err.Error())
- }
- testnum++
+ tk.checkSuggestPeer("01000001", 0, false)
+ tk.checkSuggestPeer("00100001", 0, false)
+ tk.checkSuggestPeer("<nil>", 0, false)
+
+ tk.On("01000001", "00100001")
+ tk.Register("10000100", "01000100", "00100100")
+ tk.Register("00000100", "00000101", "00000110")
+ tk.Register("00000010", "00000011", "00000001")
+
+ tk.checkSuggestPeer("00000110", 0, false)
+ tk.checkSuggestPeer("00000101", 0, false)
+ tk.checkSuggestPeer("00000100", 0, false)
+ tk.checkSuggestPeer("00000011", 0, false)
+ tk.checkSuggestPeer("00000010", 0, false)
+ tk.checkSuggestPeer("00000001", 0, false)
+ tk.checkSuggestPeer("<nil>", 0, false)
}
// a node should stay in the address book if it's removed from the kademlia
func TestOffEffectingAddressBookNormalNode(t *testing.T) {
- k := newTestKademlia("00000000")
+ tk := newTestKademlia(t, "00000000")
// peer added to kademlia
- k.On(newTestKadPeer(k, "01000000", false))
+ tk.On("01000000")
// peer should be in the address book
- if k.addrs.Size() != 1 {
+ if tk.addrs.Size() != 1 {
t.Fatal("known peer addresses should contain 1 entry")
}
// peer should be among live connections
- if k.conns.Size() != 1 {
+ if tk.conns.Size() != 1 {
t.Fatal("live peers should contain 1 entry")
}
// remove peer from kademlia
- k.Off(newTestKadPeer(k, "01000000", false))
+ tk.Off("01000000")
// peer should be in the address book
- if k.addrs.Size() != 1 {
+ if tk.addrs.Size() != 1 {
t.Fatal("known peer addresses should contain 1 entry")
}
// peer should not be among live connections
- if k.conns.Size() != 0 {
+ if tk.conns.Size() != 0 {
t.Fatal("live peers should contain 0 entry")
}
}
// a light node should not be in the address book
func TestOffEffectingAddressBookLightNode(t *testing.T) {
- k := newTestKademlia("00000000")
+ tk := newTestKademlia(t, "00000000")
// light node peer added to kademlia
- k.On(newTestKadPeer(k, "01000000", true))
+ tk.Kademlia.On(tk.newTestKadPeer("01000000", true))
// peer should not be in the address book
- if k.addrs.Size() != 0 {
+ if tk.addrs.Size() != 0 {
t.Fatal("known peer addresses should contain 0 entry")
}
// peer should be among live connections
- if k.conns.Size() != 1 {
+ if tk.conns.Size() != 1 {
t.Fatal("live peers should contain 1 entry")
}
// remove peer from kademlia
- k.Off(newTestKadPeer(k, "01000000", true))
+ tk.Kademlia.Off(tk.newTestKadPeer("01000000", true))
// peer should not be in the address book
- if k.addrs.Size() != 0 {
+ if tk.addrs.Size() != 0 {
t.Fatal("known peer addresses should contain 0 entry")
}
// peer should not be among live connections
- if k.conns.Size() != 0 {
+ if tk.conns.Size() != 0 {
t.Fatal("live peers should contain 0 entry")
}
}
func TestSuggestPeerRetries(t *testing.T) {
- k := newTestKademlia("00000000")
- k.RetryInterval = int64(300 * time.Millisecond) // cycle
- k.MaxRetries = 50
- k.RetryExponent = 2
+ tk := newTestKademlia(t, "00000000")
+ tk.RetryInterval = int64(300 * time.Millisecond) // cycle
+ tk.MaxRetries = 50
+ tk.RetryExponent = 2
sleep := func(n int) {
- ts := k.RetryInterval
+ ts := tk.RetryInterval
for i := 1; i < n; i++ {
- ts *= int64(k.RetryExponent)
+ ts *= int64(tk.RetryExponent)
}
time.Sleep(time.Duration(ts))
}
- Register(k, "01000000")
- On(k, "00000001", "00000010")
- err := testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.Register("01000000")
+ tk.On("00000001", "00000010")
+ tk.checkSuggestPeer("01000000", 0, false)
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("<nil>", 0, false)
sleep(1)
- err = testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("01000000", 0, false)
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("<nil>", 0, false)
sleep(1)
- err = testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("01000000", 0, false)
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("<nil>", 0, false)
sleep(2)
- err = testSuggestPeer(k, "01000000", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("01000000", 0, false)
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
+ tk.checkSuggestPeer("<nil>", 0, false)
sleep(2)
- err = testSuggestPeer(k, "<nil>", 0, false)
- if err != nil {
- t.Fatal(err.Error())
- }
-
+ tk.checkSuggestPeer("<nil>", 0, false)
}
func TestKademliaHiveString(t *testing.T) {
- k := newTestKademlia("00000000")
- On(k, "01000000", "00100000")
- Register(k, "10000000", "10000001")
- k.MaxProxDisplay = 8
- h := k.String()
- expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 1, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n========================================================================="
+ tk := newTestKademlia(t, "00000000")
+ tk.On("01000000", "00100000")
+ tk.Register("10000000", "10000001")
+ tk.MaxProxDisplay = 8
+ h := tk.String()
+ expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n========================================================================="
if expH[104:] != h[104:] {
t.Fatalf("incorrect hive output. expected %v, got %v", expH, h)
}
}
-// testKademliaCase constructs the kademlia and PeerPot map to validate
-// the SuggestPeer and Healthy methods for provided hex-encoded addresses.
-// Argument pivotAddr is the address of the kademlia.
-func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) {
-
- t.Skip("this test relies on SuggestPeer which is now not reliable. See description in TestSuggestPeerFindPeers")
- addr := common.Hex2Bytes(pivotAddr)
- var byteAddrs [][]byte
- for _, ahex := range addrs {
- byteAddrs = append(byteAddrs, common.Hex2Bytes(ahex))
- }
-
- k := NewKademlia(addr, NewKadParams())
-
- // our pivot kademlia is the last one in the array
- for _, a := range byteAddrs {
- if bytes.Equal(a, addr) {
- continue
- }
- p := &BzzAddr{OAddr: a, UAddr: a}
- if err := k.Register(p); err != nil {
- t.Fatalf("a %x addr %x: %v", a, addr, err)
- }
- }
-
- ppmap := NewPeerPotMap(k.NeighbourhoodSize, byteAddrs)
-
- pp := ppmap[pivotAddr]
-
- for {
- a, _, _ := k.SuggestPeer()
- if a == nil {
- break
- }
- k.On(NewPeer(&BzzPeer{BzzAddr: a}, k))
- }
-
- h := k.Healthy(pp)
- if !(h.ConnectNN && h.KnowNN && h.CountKnowNN > 0) {
- t.Fatalf("not healthy: %#v\n%v", h, k.String())
- }
-}
-
-/*
-The regression test for the following invalid kademlia edge case.
-
-Addresses used in this test are discovered as part of the simulation network
-in higher level tests for streaming. They were generated randomly.
-
-=========================================================================
-Mon Apr 9 12:18:24 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 7efef1
-population: 9 (49), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4
-000 2 d7e5 ec56 | 18 ec56 (0) d7e5 (0) d9e0 (0) c735 (0)
-001 2 18f1 3176 | 14 18f1 (0) 10bb (0) 10d1 (0) 0421 (0)
-002 2 52aa 47cd | 11 52aa (0) 51d9 (0) 5161 (0) 5130 (0)
-003 1 646e | 1 646e (0)
-004 0 | 3 769c (0) 76d1 (0) 7656 (0)
-============ DEPTH: 5 ==========================================
-005 1 7a48 | 1 7a48 (0)
-006 1 7cbd | 1 7cbd (0)
-007 0 | 0
-008 0 | 0
-009 0 | 0
-010 0 | 0
-011 0 | 0
-012 0 | 0
-013 0 | 0
-014 0 | 0
-015 0 | 0
-=========================================================================
-*/
-func TestKademliaCase1(t *testing.T) {
- testKademliaCase(t,
- "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e",
- "ec560e6a4806aa37f147ee83687f3cf044d9953e61eedb8c34b6d50d9e2c5623",
- "646e9540c84f6a2f9cf6585d45a4c219573b4fd1b64a3c9a1386fc5cf98c0d4d",
- "18f13c5fba653781019025ab10e8d2fdc916d6448729268afe9e928ffcdbb8e8",
- "317617acf99b4ffddda8a736f8fc6c6ede0bf690bc23d834123823e6d03e2f69",
- "d7e52d9647a5d1c27a68c3ee65d543be3947ae4b68537b236d71ef9cb15fb9ab",
- "7a48f75f8ca60487ae42d6f92b785581b40b91f2da551ae73d5eae46640e02e8",
- "7cbd42350bde8e18ae5b955b5450f8e2cef3419f92fbf5598160c60fd78619f0",
- "52aa3ddec61f4d48dd505a2385403c634f6ad06ee1d99c5c90a5ba6006f9af9c",
- "47cdb6fa93eeb8bc91a417ff4e3b14a9c2ea85137462e2f575fae97f0c4be60d",
- "5161943eb42e2a03e715fe8afa1009ff5200060c870ead6ab103f63f26cb107f",
- "a38eaa1255f76bf883ca0830c86e8c4bb7eed259a8348aae9b03f21f90105bee",
- "b2522bdf1ab26f324e75424fdf6e493b47e8a27687fe76347607b344fc010075",
- "5bd7213964efb2580b91d02ac31ef126838abeba342f5dbdbe8d4d03562671a2",
- "0b531adb82744768b694d7f94f73d4f0c9de591266108daeb8c74066bfc9c9ca",
- "28501f59f70e888d399570145ed884353e017443c675aa12731ada7c87ea14f7",
- "4a45f1fc63e1a9cb9dfa44c98da2f3d20c2923e5d75ff60b2db9d1bdb0c54d51",
- "b193431ee35cd32de95805e7c1c749450c47486595aae7195ea6b6019a64fd61",
- "baebf36a1e35a7ed834e1c72faf44ba16c159fa47d3289ceb3ca35fefa8739b5",
- "a3659bd32e05fa36c8d20dbaaed8362bf1a8a7bd116aed62d8a43a2efbdf513f",
- "10d1b50881a4770ebebdd0a75589dabb931e6716747b0f65fd6b080b88c4fdb6",
- "3c76b8ca5c7ce6a03320646826213f59229626bf5b9d25da0c3ec0662dcb8ff3",
- "4d72a04ddeb851a68cd197ef9a92a3e2ff01fbbff638e64929dd1a9c2e150112",
- "c7353d320987956075b5bc1668571c7a36c800d5598fdc4832ec6569561e15d1",
- "d9e0c7c90878c20ab7639d5954756f54775404b3483407fe1b483635182734f6",
- "8fca67216b7939c0824fb06c5279901a94da41da9482b000f56df9906736ee75",
- "460719d7f7aa7d7438f0eaf30333484fa3bd0f233632c10ba89e6e46dd3604be",
- "0421d92c8a1c79ed5d01305a3d25aaf22a8f5f9e3d4bc80da47ee16ce20465fe",
- "3441d9d9c0f05820a1bb6459fc7d8ef266a1bd929e7db939a10f544efe8261ea",
- "ab198a66c293586746758468c610e5d3914d4ce629147eff6dd55a31f863ff8f",
- "3a1c8c16b0763f3d2c35269f454ff779d1255e954d2deaf6c040fb3f0bcdc945",
- "5561c0ea3b203e173b11e6aa9d0e621a4e10b1d8b178b8fe375220806557b823",
- "7656caccdc79cd8d7ce66d415cc96a718e8271c62fb35746bfc2b49faf3eebf3",
- "5130594fd54c1652cf2debde2c4204573ed76555d1e26757fe345b409af1544a",
- "76d1e83c71ca246d042e37ff1db181f2776265fbcfdc890ce230bfa617c9c2f0",
- "89580231962624c53968c1b0095b4a2732b2a2640a19fdd7d21fd064fcc0a5ef",
- "3d10d001fff44680c7417dd66ecf2e984f0baa20a9bbcea348583ba5ff210c4f",
- "43754e323f0f3a1155b1852bd6edd55da86b8c4cfe3df8b33733fca50fc202b8",
- "a9e7b1bb763ae6452ddcacd174993f82977d81a85206bb2ae3c842e2d8e19b4c",
- "10bb07da7bc7c7757f74149eff167d528a94a253cdc694a863f4d50054c00b6d",
- "28f0bc1b44658548d6e05dd16d4c2fe77f1da5d48b6774bc4263b045725d0c19",
- "835fbbf1d16ba7347b6e2fc552d6e982148d29c624ea20383850df3c810fa8fc",
- "8e236c56a77d7f46e41e80f7092b1a68cd8e92f6156365f41813ad1ca2c6b6f3",
- "51d9c857e9238c49186e37b4eccf17a82de3d5739f026f6043798ab531456e73",
- "bbddf7db6a682225301f36a9fd5b0d0121d2951753e1681295f3465352ad511f",
- "2690a910c33ee37b91eb6c4e0731d1d345e2dc3b46d308503a6e85bbc242c69e",
- "769ce86aa90b518b7ed382f9fdacfbed93574e18dc98fe6c342e4f9f409c2d5a",
- "ba3bebec689ce51d3e12776c45f80d25164fdfb694a8122d908081aaa2e7122c",
- "3a51f4146ea90a815d0d283d1ceb20b928d8b4d45875e892696986a3c0d8fb9b",
- "81968a2d8fb39114342ee1da85254ec51e0608d7f0f6997c2a8354c260a71009",
- )
-}
-
-/*
-The regression test for the following invalid kademlia edge case.
-
-Addresses used in this test are discovered as part of the simulation network
-in higher level tests for streaming. They were generated randomly.
-
-=========================================================================
-Mon Apr 9 18:43:48 UTC 2018 KΛÐΞMLIΛ hive: queen's address: bc7f3b
-population: 9 (49), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4
-000 2 0f49 67ff | 28 0f49 (0) 0211 (0) 07b2 (0) 0703 (0)
-001 2 e84b f3a4 | 13 f3a4 (0) e84b (0) e58b (0) e60b (0)
-002 1 8dba | 1 8dba (0)
-003 2 a008 ad72 | 2 ad72 (0) a008 (0)
-004 0 | 3 b61f (0) b27f (0) b027 (0)
-============ DEPTH: 5 ==========================================
-005 1 ba19 | 1 ba19 (0)
-006 0 | 0
-007 1 bdd6 | 1 bdd6 (0)
-008 0 | 0
-009 0 | 0
-010 0 | 0
-011 0 | 0
-012 0 | 0
-013 0 | 0
-014 0 | 0
-015 0 | 0
-=========================================================================
-*/
-func TestKademliaCase2(t *testing.T) {
- testKademliaCase(t,
- "bc7f3b6a4a7e3c91b100ca6680b6c06ff407972b88956324ca853295893e0237", "67ffb61d3aa27449d277016188f35f19e2321fbda5008c68cf6303faa080534f", "600cd54c842eadac1729c04abfc369bc244572ca76117105b9dd910283b82730", "d955a05409650de151218557425105a8aa2867bb6a0e0462fa1cf90abcf87ad6", "7a6b726de45abdf7bb3e5fd9fb0dc8932270ca4dedef92238c80c05bcdb570e3", "263e99424ebfdb652adb4e3dcd27d59e11bb7ae1c057b3ef6f390d0228006254", "ba195d1a53aafde68e661c64d39db8c2a73505bf336125c15c3560de3b48b7ed", "3458c762169937115f67cabc35a6c384ed70293a8aec37b077a6c1b8e02d510e", "4ef4dc2e28ac6efdba57e134ac24dd4e0be68b9d54f7006515eb9509105f700c", "2a8782b79b0c24b9714dfd2c8ff1932bebc08aa6520b4eaeaa59ff781238890c", "625d02e960506f4524e9cdeac85b33faf3ea437fceadbd478b62b78720cf24fc", "e051a36a8c8637f520ba259c9ed3fadaf740dadc6a04c3f0e21778ebd4cd6ac4", "e34bc014fa2504f707bb3d904872b56c2fa250bee3cb19a147a0418541f1bd90", "28036dc79add95799916893890add5d8972f3b95325a509d6ded3d448f4dc652", "1b013c407794fa2e4c955d8f51cbc6bd78588a174b6548246b291281304b5409", "34f71b68698e1534095ff23ee9c35bf64c7f12b8463e7c6f6b19c25cf03928b4", "c712c6e9bbb7076832972a95890e340b94ed735935c3c0bb788e61f011b59479", "a008d5becdcda4b9dbfdaafc3cec586cf61dcf2d4b713b6168fff02e3b9f0b08", "29de15555cdbebaab214009e416ee92f947dcec5dab9894129f50f1b17138f34", "5df9449f700bd4b5a23688b68b293f2e92fa6ca524c93bc6bb9936efba9d9ada", "3ab0168a5f87fedc6a39b53c628256ac87a98670d8691bbdaaecec22418d13a2", "1ee299b2d2a74a568494130e6869e66d57982d345c482a0e0eeb285ac219ae3b", "e0e0e3b860cea9b7a74cf1b0675cc632dc64e80a02f20bbc5e96e2e8bb670606", "dc1ba6f169b0fcdcca021dcebaf39fe5d4875e7e69b854fad65687c1d7719ec0", "d321f73e42fcfb1d3a303eddf018ca5dffdcfd5567cd5ec1212f045f6a07e47d", "070320c3da7b542e5ca8aaf6a0a53d2bb5113ed264ab1db2dceee17c729edcb1", "17d314d65fdd136b50d182d2c8f5edf16e7838c2be8cf2c00abe4b406dbcd1d8", "e60b99e0a06f7d2d99d84085f67cdf8cc22a9ae22c339365d80f90289834a2b4", "02115771e18932e1f67a45f11f5bf743c5dae97fbc477d34d35c996012420eac", "3102a40eb2e5060353dd19bf61eeec8782dd1bebfcb57f4c796912252b591827", "8dbaf231062f2dc7ddaba5f9c7761b0c21292be51bf8c2ef503f31d4a2f63f79", "b02787b713c83a9f9183216310f04251994e04c2763a9024731562e8978e7cc4", "b27fe6cd33989e10909ce794c4b0b88feae286b614a59d49a3444c1a7b51ea82", "07b2d2c94fdc6fd148fe23be2ed9eff54f5e12548f29ed8416e6860fc894466f", "e58bf9f451ef62ac44ff0a9bb0610ec0fd14d423235954f0d3695e83017cbfc4", "bdd600b91bb79d1ee0053b854de308cfaa7e2abce575ea6815a0a7b3449609c2", "0f49c93c1edc7999920b21977cedd51a763940dac32e319feb9c1df2da0f3071", "7cbf0297cd41acf655cd6f960d7aaf61479edb4189d5c001cbc730861f0deb41", "79265193778d87ad626a5f59397bc075872d7302a12634ce2451a767d0a82da2", "2fe7d705f7c370b9243dbaafe007d555ff58d218822fca49d347b12a0282457c", "e84bc0c83d05e55a0080eed41dda5a795da4b9313a4da697142e69a65834cbb3", "cc4d278bd9aa0e9fb3cd8d2e0d68fb791aab5de4b120b845c409effbed47a180", "1a2317a8646cd4b6d3c4aa4cc25f676533abb689cf180787db216880a1239ad8", "cbafd6568cf8e99076208e6b6843f5808a7087897c67aad0c54694669398f889", "7b7c8357255fc37b4dae0e1af61589035fd39ff627e0938c6b3da8b4e4ec5d23", "2b8d782c1f5bac46c922cf439f6aa79f91e9ba5ffc0020d58455188a2075b334", "b61f45af2306705740742e76197a119235584ced01ef3f7cf3d4370f6c557cd1", "2775612e7cdae2780bf494c370bdcbe69c55e4a1363b1dc79ea0135e61221cce", "f3a49bb22f40885e961299abfa697a7df690a79f067bf3a4847a3ad48d826c9f", "ad724ac218dc133c0aadf4618eae21fdd0c2f3787af279846b49e2b4f97ff167",
- )
-}
-
-/*
-The regression test for the following invalid kademlia edge case.
-
-Addresses used in this test are discovered as part of the simulation network
-in higher level tests for streaming. They were generated randomly.
-
-=========================================================================
-Mon Apr 9 19:04:35 UTC 2018 KΛÐΞMLIΛ hive: queen's address: b4822e
-population: 8 (49), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4
-000 2 786c 774b | 29 774b (0) 786c (0) 7a79 (0) 7d2f (0)
-001 2 d9de cf19 | 10 cf19 (0) d9de (0) d2ff (0) d2a2 (0)
-002 2 8ca1 8d74 | 5 8d74 (0) 8ca1 (0) 9793 (0) 9f51 (0)
-003 0 | 0
-004 0 | 3 bfac (0) bcbb (0) bde9 (0)
-005 0 | 0
-============ DEPTH: 6 ==========================================
-006 1 b660 | 1 b660 (0)
-007 0 | 0
-008 1 b450 | 1 b450 (0)
-009 0 | 0
-010 0 | 0
-011 0 | 0
-012 0 | 0
-013 0 | 0
-014 0 | 0
-015 0 | 0
-=========================================================================
-*/
-func TestKademliaCase3(t *testing.T) {
- testKademliaCase(t,
- "b4822e874a01b94ac3a35c821e6db131e785c2fcbb3556e84b36102caf09b091", "2ecf54ea38d58f9cfc3862e54e5854a7c506fbc640e0b38e46d7d45a19794999", "442374092be50fc7392e8dd3f6fab3158ff7f14f26ff98060aed9b2eecf0b97d", "b450a4a67fcfa3b976cf023d8f1f15052b727f712198ce901630efe2f95db191", "9a7291638eb1c989a6dd6661a42c735b23ac6605b5d3e428aa5ffe650e892c85", "67f62eeab9804cfcac02b25ebeab9113d1b9d03dd5200b1c5a324cc0163e722f", "2e4a0e4b53bca4a9d7e2734150e9f579f29a255ade18a268461b20d026c9ee90", "30dd79c5fcdaa1b106f6960c45c9fde7c046aa3d931088d98c52ab759d0b2ac4", "97936fb5a581e59753c54fa5feec493714f2218245f61f97a62eafd4699433e4", "3a2899b6e129e3e193f6e2aefb82589c948c246d2ec1d4272af32ef3b2660f44", "f0e2a8aa88e67269e9952431ef12e5b29b7f41a1871fbfc38567fad95655d607", "7fa12b3f3c5f8383bfc644b958f72a486969733fa097d8952b3eb4f7b4f73192", "360c167aad5fc992656d6010ec45fdce5bcd492ad9608bc515e2be70d4e430c1", "fe21bc969b3d8e5a64a6484a829c1e04208f26f3cd4de6afcbc172a5bd17f1f1", "b660a1f40141d7ccd282fe5bd9838744119bd1cb3780498b5173578cc5ad308f", "44dcb3370e76680e2fba8cd986ad45ff0b77ca45680ee8d950e47922c4af6226", "8ca126923d17fccb689647307b89f38aa14e2a7b9ebcf3c1e31ccf3d2291a3bc", "f0ae19ae9ce6329327cbf42baf090e084c196b0877d8c7b69997e0123be23ef8", "d2a2a217385158e3e1e348883a14bc423e57daa12077e8c49797d16121ea0810", "f5467ccd85bb4ebe768527db520a210459969a5f1fae6e07b43f519799f0b224", "68be5fd9f9d142a5099e3609011fe3bab7bb992c595999e31e0b3d1668dfb3cf", "4d49a8a476e4934afc6b5c36db9bece3ed1804f20b952da5a21b2b0de766aa73", "ea7155745ef3fb2d099513887a2ba279333ced65c65facbd890ce58bd3fce772", "cf19f51f4e848053d289ac95a9138cdd23fc3077ae913cd58cda8cc7a521b2e1", "590b1cd41c7e6144e76b5cd515a3a4d0a4317624620a3f1685f43ae68bdcd890", "d2ffe0626b5f94a7e00fa0b506e7455a3d9399c15800db108d5e715ef5f6e346", "69630878c50a91f6c2edd23a706bfa0b50bd5661672a37d67bab38e6bca3b698", "445e9067079899bb5faafaca915ae6c0f6b1b730a5a628835dd827636f7feb1e", "6461c77491f1c4825958949f23c153e6e1759a5be53abbcee17c9da3867f3141", "23a235f4083771ccc207771daceda700b525a59ab586788d4f6892e69e34a6e2", "bde99f79ef41a81607ddcf92b9f95dcbc6c3537e91e8bf740e193dc73b19485e", "177957c0e5f0fbd12b88022a91768095d193830986caec8d888097d3ff4310b8", "bcbbdbaa4cdf8352422072f332e05111b732354a35c4d7c617ce1fc3b8b42a5a", "774b6717fdfb0d1629fb9d4c04a9ca40079ae2955d7f82e897477055ed017abb", "16443bf625be6d39ecaa6f114e5d2c1d47a64bfd3c13808d94b55b6b6acef2ee", "8d7495d9008066505ed00ce8198af82bfa5a6b4c08768b4c9fb3aa4eb0b0cca2", "15800849a53349508cb382959527f6c3cf1a46158ff1e6e2316b7dea7967e35f", "7a792f0f4a2b731781d1b244b2a57947f1a2e32900a1c0793449f9f7ae18a7b7", "5e517c2832c9deaa7df77c7bad4d20fd6eda2b7815e155e68bc48238fac1416f", "9f51a14f0019c72bd1d472706d8c80a18c1873c6a0663e754b60eae8094483d7", "7d2fabb565122521d22ba99fed9e5be6a458fbc93156d54db27d97a00b8c3a97", "786c9e412a7db4ec278891fa534caa9a1d1a028c631c6f3aeb9c4d96ad895c36", "3bd6341d40641c2632a5a0cd7a63553a04e251efd7195897a1d27e02a7a8bfde", "31efd1f5fb57b8cff0318d77a1a9e8d67e1d1c8d18ce90f99c3a240dff48cdc8", "d9de3e1156ce1380150948acbcfecd99c96e7f4b0bc97745f4681593d017f74f", "427a2201e09f9583cd990c03b81b58148c297d474a3b50f498d83b1c7a9414cd", "bfaca11596d3dec406a9fcf5d97536516dfe7f0e3b12078428a7e1700e25218a", "351c4770a097248a650008152d0cab5825d048bef770da7f3364f59d1e721bc0", "ee00f205d1486b2be7381d962bd2867263758e880529e4e2bfedfa613bbc0e71", "6aa3b6418d89e3348e4859c823ef4d6d7cd46aa7f7e77aba586c4214d760d8f8",
- )
-}
-
-/*
-The regression test for the following invalid kademlia edge case.
-
-Addresses used in this test are discovered as part of the simulation network
-in higher level tests for streaming. They were generated randomly.
-
-=========================================================================
-Mon Apr 9 19:16:25 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 9a90fe
-population: 8 (49), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4
-000 2 72ef 4e6c | 24 0b1e (0) 0d66 (0) 17f5 (0) 17e8 (0)
-001 2 fc2b fa47 | 13 fa47 (0) fc2b (0) fffd (0) ecef (0)
-002 2 b847 afa8 | 6 afa8 (0) ad77 (0) bb7c (0) b847 (0)
-003 0 | 0
-004 0 | 4 91fc (0) 957d (0) 9482 (0) 949a (0)
-============ DEPTH: 5 ==========================================
-005 1 9ccf | 1 9ccf (0)
-006 0 | 0
-007 1 9bb2 | 1 9bb2 (0)
-008 0 | 0
-009 0 | 0
-010 0 | 0
-011 0 | 0
-012 0 | 0
-013 0 | 0
-014 0 | 0
-015 0 | 0
-=========================================================================
-*/
-func TestKademliaCase4(t *testing.T) {
- testKademliaCase(t,
- "9a90fe3506277244549064b8c3276abb06284a199d9063a97331947f2b7da7f4",
- "c19359eddef24b7be1a833b4475f212cd944263627a53f9ef4837d106c247730", "fc2b6fef99ef947f7e57c3df376891769e2a2fd83d2b8e634e0fc1e91eaa080c", "ecefc0e1a8ea7bb4b48c469e077401fce175dd75294255b96c4e54f6a2950a55", "bb7ce598efc056bba343cc2614aa3f67a575557561290b44c73a63f8f433f9f7", "55fbee6ca52dfd7f0be0db969ee8e524b654ab4f0cce7c05d83887d7d2a15460", "afa852b6b319998c6a283cc0c82d2f5b8e9410075d7700f3012761f1cfbd0f76", "36c370cfb63f2087971ba6e58d7585b04e16b8f0da335efb91554c2dd8fe191c", "6be41e029985edebc901fb77fc4fb65516b6d85086e2a98bfa3159c99391e585", "dd3cfc72ea553e7d2b28f0037a65646b30955b929d29ba4c40f4a2a811248e77", "da3a8f18e09c7b0ca235c4e33e1441a5188f1df023138bf207753ee63e768f7d", "de9e3ab4dc572d54a2d4b878329fd832bb51a149f4ce167316eeb177b61e7e01", "4e6c1ecde6ed917706257fe020a1d02d2e9d87fca4c85f0f7b132491008c5032", "72ef04b77a070e13463b3529dd312bcacfb7a12d20dc597f5ec3de0501e9b834", "3fef57186675d524ab8bb1f54ba8cb68610babca1247c0c46dbb60aed003c69d", "1d8e6b71f7a052865d6558d4ba44ad5fab7b908cc1badf5766822e1c20d0d823", "6be2f2b4ffa173014d4ec7df157d289744a2bda54bb876b264ccfa898a0da315", "b0ba3fff8643f9985c744327b0c4c869763509fd5da2de9a80a4a0a082021255", "9ccf40b9406ba2e6567101fb9b4e5334a9ec74263eff47267da266ba45e6c158", "d7347f02c180a448e60f73931845062ce00048750b584790278e9c93ef31ad81", "b68c6359a22b3bee6fecb8804311cfd816648ea31d530c9fb48e477e029d707a", "0d668a18ad7c2820214df6df95a6c855ce19fb1cb765f8ca620e45db76686d37", "3fbd2663bff65533246f1fabb9f38086854c6218aeb3dc9ac6ac73d4f0988f91", "949aa5719ca846052bfaa1b38c97b6eca3df3e24c0e0630042c6bccafbb4cdb5", "77b8a2b917bef5d54f3792183b014cca7798f713ff14fe0b2ac79b4c9f6f996d", "17e853cbd8dc00cba3cd9ffeb36f26a9f41a0eb92f80b62c2cda16771c935388", "5f682ed7a8cf2f98387c3def7c97f9f05ae39e39d393eeca3cf621268d6347f8", "ad77487eaf11fd8084ba4517a51766eb0e5b77dd3492dfa79aa3a2802fb29d20", "d247cfcacf9a8200ebaddf639f8c926ab0a001abe682f40df3785e80ed124e91", "195589442e11907eede1ee6524157f1125f68399f3170c835ff81c603b069f6c", "5b5ca0a67f3c54e7d3a6a862ef56168ec9ed1f4945e6c24de6d336b2be2e6f8c", "56430e4caa253015f1f998dce4a48a88af1953f68e94eca14f53074ae9c3e467", "0b1eed6a5bf612d1d8e08f5c546f3d12e838568fd3aa43ed4c537f10c65545d6", "7058db19a56dfff01988ac4a62e1310597f9c8d7ebde6890dadabf047d722d39", "b847380d6888ff7cd11402d086b19eccc40950b52c9d67e73cb4f8462f5df078", "df6c048419a2290ab546d527e9eeba349e7f7e1759bafe4adac507ce60ef9670", "91fc5b4b24fc3fbfea7f9a3d0f0437cb5733c0c2345d8bdffd7048d6e3b8a37b", "957d8ea51b37523952b6f5ae95462fcd4aed1483ef32cc80b69580aaeee03606", "efa82e4e91ad9ab781977400e9ac0bb9de7389aaedebdae979b73d1d3b8d72b0", "7400c9f3f3fc0cc6fe8cc37ab24b9771f44e9f78be913f73cd35fc4be030d6bd", "9bb28f4122d61f7bb56fe27ef706159fb802fef0f5de9dfa32c9c5b3183235f1", "40a8de6e98953498b806614532ea4abf8b99ad7f9719fb68203a6eae2efa5b2a", "412de0b218b8f7dcacc9205cd16ffb4eca5b838f46a2f4f9f534026061a47308", "17f56ecad51075080680ad9faa0fd8946b824d3296ddb20be07f9809fe8d1c5a", "fffd4e7ae885a41948a342b6647955a7ec8a8039039f510cff467ef597675457", "35e78e11b5ac46a29dd04ab0043136c3291f4ca56cb949ace33111ed56395463", "94824fc80230af82077c83bfc01dc9675b1f9d3d538b1e5f41c21ac753598691", "fa470ae314ca3fce493f21b423eef2a49522e09126f6f2326fa3c9cac0b344f7", "7078860b5b621b21ac7b95f9fc4739c8235ce5066a8b9bd7d938146a34fa88ec", "eea53560f0428bfd2eca4f86a5ce9dec5ff1309129a975d73465c1c9e9da71d1",
- )
-}
-
-/*
-The regression test for the following invalid kademlia edge case.
-
-Addresses used in this test are discovered as part of the simulation network
-in higher level tests for streaming. They were generated randomly.
-
-=========================================================================
-Mon Apr 9 19:25:18 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 5dd5c7
-population: 13 (49), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4
-000 2 e528 fad0 | 22 fad0 (0) e528 (0) e3bb (0) ed13 (0)
-001 3 3f30 18e0 1dd3 | 7 3f30 (0) 23db (0) 10b6 (0) 18e0 (0)
-002 4 7c54 7804 61e4 60f9 | 10 61e4 (0) 60f9 (0) 636c (0) 7186 (0)
-003 2 40ae 4bae | 5 4bae (0) 4d5c (0) 403a (0) 40ae (0)
-004 0 | 0
-005 0 | 3 5808 (0) 5a0e (0) 5bdb (0)
-============ DEPTH: 6 ==========================================
-006 2 5f14 5f61 | 2 5f14 (0) 5f61 (0)
-007 0 | 0
-008 0 | 0
-009 0 | 0
-010 0 | 0
-011 0 | 0
-012 0 | 0
-013 0 | 0
-014 0 | 0
-015 0 | 0
-=========================================================================
-*/
-func TestKademliaCase5(t *testing.T) {
- testKademliaCase(t,
- "5dd5c77dd9006a800478fcebb02d48d4036389e7d3c8f6a83b97dbad13f4c0a9",
- "78fafa0809929a1279ece089a51d12457c2d8416dff859aeb2ccc24bb50df5ec", "1dd39b1257e745f147cbbc3cadd609ccd6207c41056dbc4254bba5d2527d3ee5", "5f61dd66d4d94aec8fcc3ce0e7885c7edf30c43143fa730e2841c5d28e3cd081", "8aa8b0472cb351d967e575ad05c4b9f393e76c4b01ef4b3a54aac5283b78abc9", "4502f385152a915b438a6726ce3ea9342e7a6db91a23c2f6bee83a885ed7eb82", "718677a504249db47525e959ef1784bed167e1c46f1e0275b9c7b588e28a3758", "7c54c6ed1f8376323896ed3a4e048866410de189e9599dd89bf312ca4adb96b5", "18e03bd3378126c09e799a497150da5c24c895aedc84b6f0dbae41fc4bac081a", "23db76ac9e6e58d9f5395ca78252513a7b4118b4155f8462d3d5eec62486cadc", "40ae0e8f065e96c7adb7fa39505136401f01780481e678d718b7f6dbb2c906ec", "c1539998b8bae19d339d6bbb691f4e9daeb0e86847545229e80fe0dffe716e92", "ed139d73a2699e205574c08722ca9f030ad2d866c662f1112a276b91421c3cb9", "5bdb19584b7a36d09ca689422ef7e6bb681b8f2558a6b2177a8f7c812f631022", "636c9de7fe234ffc15d67a504c69702c719f626c17461d3f2918e924cd9d69e2", "de4455413ff9335c440d52458c6544191bd58a16d85f700c1de53b62773064ea", "de1963310849527acabc7885b6e345a56406a8f23e35e436b6d9725e69a79a83", "a80a50a467f561210a114cba6c7fb1489ed43a14d61a9edd70e2eb15c31f074d", "7804f12b8d8e6e4b375b242058242068a3809385e05df0e64973cde805cf729c", "60f9aa320c02c6f2e6370aa740cf7cea38083fa95fca8c99552cda52935c1520", "d8da963602390f6c002c00ce62a84b514edfce9ebde035b277a957264bb54d21", "8463d93256e026fe436abad44697152b9a56ac8e06a0583d318e9571b83d073c", "9a3f78fcefb9a05e40a23de55f6153d7a8b9d973ede43a380bf46bb3b3847de1", "e3bb576f4b3760b9ca6bff59326f4ebfc4a669d263fb7d67ab9797adea54ed13", "4d5cdbd6dcca5bdf819a0fe8d175dc55cc96f088d37462acd5ea14bc6296bdbe", "5a0ed28de7b5258c727cb85447071c74c00a5fbba9e6bc0393bc51944d04ab2a", "61e4ddb479c283c638f4edec24353b6cc7a3a13b930824aad016b0996ca93c47", "7e3610868acf714836cafaaa7b8c009a9ac6e3a6d443e5586cf661530a204ee2", "d74b244d4345d2c86e30a097105e4fb133d53c578320285132a952cdaa64416e", "cfeed57d0f935bfab89e3f630a7c97e0b1605f0724d85a008bbfb92cb47863a8", "580837af95055670e20d494978f60c7f1458dc4b9e389fc7aa4982b2aca3bce3", "df55c0c49e6c8a83d82dfa1c307d3bf6a20e18721c80d8ec4f1f68dc0a137ced", "5f149c51ce581ba32a285439a806c063ced01ccd4211cd024e6a615b8f216f95", "1eb76b00aeb127b10dd1b7cd4c3edeb4d812b5a658f0feb13e85c4d2b7c6fe06", "7a56ba7c3fb7cbfb5561a46a75d95d7722096b45771ec16e6fa7bbfab0b35dfe", "4bae85ad88c28470f0015246d530adc0cd1778bdd5145c3c6b538ee50c4e04bd", "afd1892e2a7145c99ec0ebe9ded0d3fec21089b277a68d47f45961ec5e39e7e0", "953138885d7b36b0ef79e46030f8e61fd7037fbe5ce9e0a94d728e8c8d7eab86", "de761613ef305e4f628cb6bf97d7b7dc69a9d513dc233630792de97bcda777a6", "3f3087280063d09504c084bbf7fdf984347a72b50d097fd5b086ffabb5b3fb4c", "7d18a94bb1ebfdef4d3e454d2db8cb772f30ca57920dd1e402184a9e598581a0", "a7d6fbdc9126d9f10d10617f49fb9f5474ffe1b229f76b7dd27cebba30eccb5d", "fad0246303618353d1387ec10c09ee991eb6180697ed3470ed9a6b377695203d", "1cf66e09ea51ee5c23df26615a9e7420be2ac8063f28f60a3bc86020e94fe6f3", "8269cdaa153da7c358b0b940791af74d7c651cd4d3f5ed13acfe6d0f2c539e7f", "90d52eaaa60e74bf1c79106113f2599471a902d7b1c39ac1f55b20604f453c09", "9788fd0c09190a3f3d0541f68073a2f44c2fcc45bb97558a7c319f36c25a75b3", "10b68fc44157ecfdae238ee6c1ce0333f906ad04d1a4cb1505c8e35c3c87fbb0", "e5284117fdf3757920475c786e0004cb00ba0932163659a89b36651a01e57394", "403ad51d911e113dcd5f9ff58c94f6d278886a2a4da64c3ceca2083282c92de3",
- )
-}
-
func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer {
rw := &p2p.MsgPipeRW{}
p := p2p.NewPeer(enode.ID{}, "foo", []p2p.Cap{})
diff --git a/swarm/network/networkid_test.go b/swarm/network/networkid_test.go
index 99890118f..9d47cf9f6 100644
--- a/swarm/network/networkid_test.go
+++ b/swarm/network/networkid_test.go
@@ -44,7 +44,7 @@ var (
const (
NumberOfNets = 4
- MaxTimeout = 6
+ MaxTimeout = 15 * time.Second
)
func init() {
@@ -76,13 +76,12 @@ func TestNetworkID(t *testing.T) {
if err != nil {
t.Fatalf("Error setting up network: %v", err)
}
- defer func() {
- //shutdown the snapshot network
- log.Trace("Shutting down network")
- net.Shutdown()
- }()
//let's sleep to ensure all nodes are connected
time.Sleep(1 * time.Second)
+ // shutdown the the network to avoid race conditions
+ // on accessing kademlias global map while network nodes
+ // are accepting messages
+ net.Shutdown()
//for each group sharing the same network ID...
for _, netIDGroup := range nodeMap {
log.Trace("netIDGroup size", "size", len(netIDGroup))
@@ -147,7 +146,7 @@ func setupNetwork(numnodes int) (net *simulations.Network, err error) {
return nil, fmt.Errorf("create node %d rpc client fail: %v", i, err)
}
//now setup and start event watching in order to know when we can upload
- ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
+ ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout)
defer watchCancel()
watchSubscriptionEvents(ctx, nodes[i].ID(), client, errc, quitC)
//on every iteration we connect to all previous ones
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go
index a4b29239c..6f8eadad2 100644
--- a/swarm/network/protocol.go
+++ b/swarm/network/protocol.go
@@ -67,6 +67,7 @@ type BzzConfig struct {
HiveParams *HiveParams
NetworkID uint64
LightNode bool
+ BootnodeMode bool
}
// Bzz is the swarm protocol bundle
@@ -87,7 +88,7 @@ type Bzz struct {
// * overlay driver
// * peer store
func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
- return &Bzz{
+ bzz := &Bzz{
Hive: NewHive(config.HiveParams, kad, store),
NetworkID: config.NetworkID,
LightNode: config.LightNode,
@@ -96,6 +97,13 @@ func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *p
streamerRun: streamerRun,
streamerSpec: streamerSpec,
}
+
+ if config.BootnodeMode {
+ bzz.streamerRun = nil
+ bzz.streamerSpec = nil
+ }
+
+ return bzz
}
// UpdateLocalAddr updates underlayaddress of the running node
@@ -168,7 +176,7 @@ func (b *Bzz) APIs() []rpc.API {
func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*BzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error {
return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// wait for the bzz protocol to perform the handshake
- handshake, _ := b.GetHandshake(p.ID())
+ handshake, _ := b.GetOrCreateHandshake(p.ID())
defer b.removeHandshake(p.ID())
select {
case <-handshake.done:
@@ -213,7 +221,7 @@ func (b *Bzz) performHandshake(p *protocols.Peer, handshake *HandshakeMsg) error
// runBzz is the p2p protocol run function for the bzz base protocol
// that negotiates the bzz handshake
func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- handshake, _ := b.GetHandshake(p.ID())
+ handshake, _ := b.GetOrCreateHandshake(p.ID())
if !<-handshake.init {
return fmt.Errorf("%08x: bzz already started on peer %08x", b.localAddr.Over()[:4], p.ID().Bytes()[:4])
}
@@ -303,7 +311,7 @@ func (b *Bzz) removeHandshake(peerID enode.ID) {
}
// GetHandshake returns the bzz handhake that the remote peer with peerID sent
-func (b *Bzz) GetHandshake(peerID enode.ID) (*HandshakeMsg, bool) {
+func (b *Bzz) GetOrCreateHandshake(peerID enode.ID) (*HandshakeMsg, bool) {
b.mtx.Lock()
defer b.mtx.Unlock()
handshake, found := b.handshakes[peerID]
diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go
index 58477a7b8..64ce7ba4a 100644
--- a/swarm/network/protocol_test.go
+++ b/swarm/network/protocol_test.go
@@ -21,6 +21,7 @@ import (
"fmt"
"os"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
@@ -82,7 +83,7 @@ func newBzzBaseTester(t *testing.T, n int, addr *BzzAddr, spec *protocols.Spec,
return srv(&BzzPeer{Peer: protocols.NewPeer(p, rw, spec), BzzAddr: NewAddr(p.Node())})
}
- s := p2ptest.NewProtocolTester(t, addr.ID(), n, protocol)
+ s := p2ptest.NewProtocolTester(addr.ID(), n, protocol)
for _, node := range s.Nodes {
cs[node.ID().String()] = make(chan bool)
@@ -115,9 +116,9 @@ func newBzz(addr *BzzAddr, lightNode bool) *Bzz {
return bzz
}
-func newBzzHandshakeTester(t *testing.T, n int, addr *BzzAddr, lightNode bool) *bzzTester {
+func newBzzHandshakeTester(n int, addr *BzzAddr, lightNode bool) *bzzTester {
bzz := newBzz(addr, lightNode)
- pt := p2ptest.NewProtocolTester(t, addr.ID(), n, bzz.runBzz)
+ pt := p2ptest.NewProtocolTester(addr.ID(), n, bzz.runBzz)
return &bzzTester{
addr: addr,
@@ -165,7 +166,7 @@ func correctBzzHandshake(addr *BzzAddr, lightNode bool) *HandshakeMsg {
func TestBzzHandshakeNetworkIDMismatch(t *testing.T) {
lightNode := false
addr := RandomAddr()
- s := newBzzHandshakeTester(t, 1, addr, lightNode)
+ s := newBzzHandshakeTester(1, addr, lightNode)
node := s.Nodes[0]
err := s.testHandshake(
@@ -182,7 +183,7 @@ func TestBzzHandshakeNetworkIDMismatch(t *testing.T) {
func TestBzzHandshakeVersionMismatch(t *testing.T) {
lightNode := false
addr := RandomAddr()
- s := newBzzHandshakeTester(t, 1, addr, lightNode)
+ s := newBzzHandshakeTester(1, addr, lightNode)
node := s.Nodes[0]
err := s.testHandshake(
@@ -199,7 +200,7 @@ func TestBzzHandshakeVersionMismatch(t *testing.T) {
func TestBzzHandshakeSuccess(t *testing.T) {
lightNode := false
addr := RandomAddr()
- s := newBzzHandshakeTester(t, 1, addr, lightNode)
+ s := newBzzHandshakeTester(1, addr, lightNode)
node := s.Nodes[0]
err := s.testHandshake(
@@ -224,7 +225,8 @@ func TestBzzHandshakeLightNode(t *testing.T) {
for _, test := range lightNodeTests {
t.Run(test.name, func(t *testing.T) {
randomAddr := RandomAddr()
- pt := newBzzHandshakeTester(t, 1, randomAddr, false)
+ pt := newBzzHandshakeTester(1, randomAddr, false)
+
node := pt.Nodes[0]
addr := NewAddr(node)
@@ -237,8 +239,14 @@ func TestBzzHandshakeLightNode(t *testing.T) {
t.Fatal(err)
}
- if pt.bzz.handshakes[node.ID()].LightNode != test.lightNode {
- t.Fatalf("peer LightNode flag is %v, should be %v", pt.bzz.handshakes[node.ID()].LightNode, test.lightNode)
+ select {
+
+ case <-pt.bzz.handshakes[node.ID()].done:
+ if pt.bzz.handshakes[node.ID()].LightNode != test.lightNode {
+ t.Fatalf("peer LightNode flag is %v, should be %v", pt.bzz.handshakes[node.ID()].LightNode, test.lightNode)
+ }
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timeout")
}
})
}
diff --git a/swarm/network/simulation/kademlia.go b/swarm/network/simulation/kademlia.go
index 6d8d0e0a2..c58d402b0 100644
--- a/swarm/network/simulation/kademlia.go
+++ b/swarm/network/simulation/kademlia.go
@@ -64,7 +64,7 @@ func (s *Simulation) WaitTillHealthy(ctx context.Context) (ill map[enode.ID]*net
addr := common.Bytes2Hex(k.BaseAddr())
pp := ppmap[addr]
//call Healthy RPC
- h := k.Healthy(pp)
+ h := k.GetHealthInfo(pp)
//print info
log.Debug(k.String())
log.Debug("kademlia", "connectNN", h.ConnectNN, "knowNN", h.KnowNN)
diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go
index 08eb83524..24afe51a4 100644
--- a/swarm/network/simulation/node.go
+++ b/swarm/network/simulation/node.go
@@ -44,7 +44,7 @@ func (s *Simulation) NodeIDs() (ids []enode.ID) {
func (s *Simulation) UpNodeIDs() (ids []enode.ID) {
nodes := s.Net.GetNodes()
for _, node := range nodes {
- if node.Up {
+ if node.Up() {
ids = append(ids, node.ID())
}
}
@@ -55,7 +55,7 @@ func (s *Simulation) UpNodeIDs() (ids []enode.ID) {
func (s *Simulation) DownNodeIDs() (ids []enode.ID) {
nodes := s.Net.GetNodes()
for _, node := range nodes {
- if !node.Up {
+ if !node.Up() {
ids = append(ids, node.ID())
}
}
diff --git a/swarm/network/simulation/node_test.go b/swarm/network/simulation/node_test.go
index dc9189c91..bae5afb26 100644
--- a/swarm/network/simulation/node_test.go
+++ b/swarm/network/simulation/node_test.go
@@ -54,7 +54,7 @@ func TestUpDownNodeIDs(t *testing.T) {
gotIDs = sim.UpNodeIDs()
for _, id := range gotIDs {
- if !sim.Net.GetNode(id).Up {
+ if !sim.Net.GetNode(id).Up() {
t.Errorf("node %s should not be down", id)
}
}
@@ -66,7 +66,7 @@ func TestUpDownNodeIDs(t *testing.T) {
gotIDs = sim.DownNodeIDs()
for _, id := range gotIDs {
- if sim.Net.GetNode(id).Up {
+ if sim.Net.GetNode(id).Up() {
t.Errorf("node %s should not be up", id)
}
}
@@ -112,7 +112,7 @@ func TestAddNode(t *testing.T) {
t.Fatal("node not found")
}
- if !n.Up {
+ if !n.Up() {
t.Error("node not started")
}
}
@@ -327,7 +327,7 @@ func TestStartStopNode(t *testing.T) {
if n == nil {
t.Fatal("node not found")
}
- if !n.Up {
+ if !n.Up() {
t.Error("node not started")
}
@@ -335,26 +335,17 @@ func TestStartStopNode(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- if n.Up {
+ if n.Up() {
t.Error("node not stopped")
}
- // Sleep here to ensure that Network.watchPeerEvents defer function
- // has set the `node.Up = false` before we start the node again.
- // p2p/simulations/network.go:215
- //
- // The same node is stopped and started again, and upon start
- // watchPeerEvents is started in a goroutine. If the node is stopped
- // and then very quickly started, that goroutine may be scheduled later
- // then start and force `node.Up = false` in its defer function.
- // This will make this test unreliable.
- time.Sleep(time.Second)
+ waitForPeerEventPropagation()
err = sim.StartNode(id)
if err != nil {
t.Fatal(err)
}
- if !n.Up {
+ if !n.Up() {
t.Error("node not started")
}
}
@@ -377,7 +368,7 @@ func TestStartStopRandomNode(t *testing.T) {
if n == nil {
t.Fatal("node not found")
}
- if n.Up {
+ if n.Up() {
t.Error("node not stopped")
}
@@ -386,16 +377,7 @@ func TestStartStopRandomNode(t *testing.T) {
t.Fatal(err)
}
- // Sleep here to ensure that Network.watchPeerEvents defer function
- // has set the `node.Up = false` before we start the node again.
- // p2p/simulations/network.go:215
- //
- // The same node is stopped and started again, and upon start
- // watchPeerEvents is started in a goroutine. If the node is stopped
- // and then very quickly started, that goroutine may be scheduled later
- // then start and force `node.Up = false` in its defer function.
- // This will make this test unreliable.
- time.Sleep(time.Second)
+ waitForPeerEventPropagation()
idStarted, err := sim.StartRandomNode()
if err != nil {
@@ -426,21 +408,12 @@ func TestStartStopRandomNodes(t *testing.T) {
if n == nil {
t.Fatal("node not found")
}
- if n.Up {
+ if n.Up() {
t.Error("node not stopped")
}
}
- // Sleep here to ensure that Network.watchPeerEvents defer function
- // has set the `node.Up = false` before we start the node again.
- // p2p/simulations/network.go:215
- //
- // The same node is stopped and started again, and upon start
- // watchPeerEvents is started in a goroutine. If the node is stopped
- // and then very quickly started, that goroutine may be scheduled later
- // then start and force `node.Up = false` in its defer function.
- // This will make this test unreliable.
- time.Sleep(time.Second)
+ waitForPeerEventPropagation()
ids, err = sim.StartRandomNodes(2)
if err != nil {
@@ -452,8 +425,20 @@ func TestStartStopRandomNodes(t *testing.T) {
if n == nil {
t.Fatal("node not found")
}
- if !n.Up {
+ if !n.Up() {
t.Error("node not started")
}
}
}
+
+func waitForPeerEventPropagation() {
+ // Sleep here to ensure that Network.watchPeerEvents defer function
+ // has set the `node.Up() = false` before we start the node again.
+ //
+ // The same node is stopped and started again, and upon start
+ // watchPeerEvents is started in a goroutine. If the node is stopped
+ // and then very quickly started, that goroutine may be scheduled later
+ // then start and force `node.Up() = false` in its defer function.
+ // This will make this test unreliable.
+ time.Sleep(1 * time.Second)
+}
diff --git a/swarm/network/simulation/service.go b/swarm/network/simulation/service.go
index 7dd4dc6d8..0ac8149a9 100644
--- a/swarm/network/simulation/service.go
+++ b/swarm/network/simulation/service.go
@@ -52,7 +52,7 @@ func (s *Simulation) Services(name string) (services map[enode.ID]node.Service)
nodes := s.Net.GetNodes()
services = make(map[enode.ID]node.Service)
for _, node := range nodes {
- if !node.Up {
+ if !node.Up() {
continue
}
simNode, ok := node.Node.(*adapters.SimNode)
diff --git a/swarm/network/simulation/simulation_test.go b/swarm/network/simulation/simulation_test.go
index f837f9382..1d0338f59 100644
--- a/swarm/network/simulation/simulation_test.go
+++ b/swarm/network/simulation/simulation_test.go
@@ -124,7 +124,7 @@ func TestClose(t *testing.T) {
var upNodeCount int
for _, n := range sim.Net.GetNodes() {
- if n.Up {
+ if n.Up() {
upNodeCount++
}
}
@@ -140,7 +140,7 @@ func TestClose(t *testing.T) {
upNodeCount = 0
for _, n := range sim.Net.GetNodes() {
- if n.Up {
+ if n.Up() {
upNodeCount++
}
}
diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go
index e5121c477..5227de3bb 100644
--- a/swarm/network/simulations/discovery/discovery_test.go
+++ b/swarm/network/simulations/discovery/discovery_test.go
@@ -18,16 +18,12 @@ package discovery
import (
"context"
- "encoding/json"
- "errors"
"flag"
"fmt"
"io/ioutil"
- "math/rand"
"os"
"path"
"strings"
- "sync"
"testing"
"time"
@@ -86,12 +82,10 @@ func getDbStore(nodeID string) (*state.DBStore, error) {
}
var (
- nodeCount = flag.Int("nodes", 10, "number of nodes to create (default 10)")
- initCount = flag.Int("conns", 1, "number of originally connected peers (default 1)")
- snapshotFile = flag.String("snapshot", "", "path to create snapshot file in")
- loglevel = flag.Int("loglevel", 3, "verbosity of logs")
- rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
- serviceOverride = flag.String("services", "", "remove or add services to the node snapshot; prefix with \"+\" to add, \"-\" to remove; example: +pss,-discovery")
+ nodeCount = flag.Int("nodes", 32, "number of nodes to create (default 32)")
+ initCount = flag.Int("conns", 1, "number of originally connected peers (default 1)")
+ loglevel = flag.Int("loglevel", 3, "verbosity of logs")
+ rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
)
func init() {
@@ -157,7 +151,6 @@ func testDiscoverySimulationSimAdapter(t *testing.T, nodes, conns int) {
}
func testDiscoverySimulation(t *testing.T, nodes, conns int, adapter adapters.NodeAdapter) {
- t.Skip("discovery tests depend on suggestpeer, which is unreliable after kademlia depth change.")
startedAt := time.Now()
result, err := discoverySimulation(nodes, conns, adapter)
if err != nil {
@@ -185,7 +178,6 @@ func testDiscoverySimulation(t *testing.T, nodes, conns int, adapter adapters.No
}
func testDiscoveryPersistenceSimulation(t *testing.T, nodes, conns int, adapter adapters.NodeAdapter) map[int][]byte {
- t.Skip("discovery tests depend on suggestpeer, which is unreliable after kademlia depth change.")
persistenceEnabled = true
discoveryEnabled = true
@@ -247,25 +239,14 @@ func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simul
action := func(ctx context.Context) error {
return nil
}
- wg := sync.WaitGroup{}
for i := range ids {
// collect the overlay addresses, to
addrs = append(addrs, ids[i].Bytes())
- for j := 0; j < conns; j++ {
- var k int
- if j == 0 {
- k = (i + 1) % len(ids)
- } else {
- k = rand.Intn(len(ids))
- }
- wg.Add(1)
- go func(i, k int) {
- defer wg.Done()
- net.Connect(ids[i], ids[k])
- }(i, k)
- }
}
- wg.Wait()
+ err := net.ConnectNodesChain(nil)
+ if err != nil {
+ return nil, err
+ }
log.Debug(fmt.Sprintf("nodes: %v", len(addrs)))
// construct the peer pot, so that kademlia health can be checked
ppmap := network.NewPeerPotMap(network.NewKadParams().NeighbourhoodSize, addrs)
@@ -286,10 +267,10 @@ func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simul
}
healthy := &network.Health{}
- if err := client.Call(&healthy, "hive_healthy", ppmap); err != nil {
+ if err := client.Call(&healthy, "hive_getHealthInfo", ppmap[common.Bytes2Hex(id.Bytes())]); err != nil {
return false, fmt.Errorf("error getting node health: %s", err)
}
- log.Info(fmt.Sprintf("node %4s healthy: connected nearest neighbours: %v, know nearest neighbours: %v,\n\n%v", id, healthy.ConnectNN, healthy.KnowNN, healthy.Hive))
+ log.Debug(fmt.Sprintf("node %4s healthy: connected nearest neighbours: %v, know nearest neighbours: %v,\n\n%v", id, healthy.ConnectNN, healthy.KnowNN, healthy.Hive))
return healthy.KnowNN && healthy.ConnectNN, nil
}
@@ -309,40 +290,6 @@ func discoverySimulation(nodes, conns int, adapter adapters.NodeAdapter) (*simul
if result.Error != nil {
return result, nil
}
-
- if *snapshotFile != "" {
- var err error
- var snap *simulations.Snapshot
- if len(*serviceOverride) > 0 {
- var addServices []string
- var removeServices []string
- for _, osvc := range strings.Split(*serviceOverride, ",") {
- if strings.Index(osvc, "+") == 0 {
- addServices = append(addServices, osvc[1:])
- } else if strings.Index(osvc, "-") == 0 {
- removeServices = append(removeServices, osvc[1:])
- } else {
- panic("stick to the rules, you know what they are")
- }
- }
- snap, err = net.SnapshotWithServices(addServices, removeServices)
- } else {
- snap, err = net.Snapshot()
- }
-
- if err != nil {
- return nil, errors.New("no shapshot dude")
- }
- jsonsnapshot, err := json.Marshal(snap)
- if err != nil {
- return nil, fmt.Errorf("corrupt json snapshot: %v", err)
- }
- log.Info("writing snapshot", "file", *snapshotFile)
- err = ioutil.WriteFile(*snapshotFile, jsonsnapshot, 0755)
- if err != nil {
- return nil, err
- }
- }
return result, nil
}
@@ -405,7 +352,7 @@ func discoveryPersistenceSimulation(nodes, conns int, adapter adapters.NodeAdapt
healthy := &network.Health{}
addr := id.String()
ppmap := network.NewPeerPotMap(network.NewKadParams().NeighbourhoodSize, addrs)
- if err := client.Call(&healthy, "hive_healthy", ppmap); err != nil {
+ if err := client.Call(&healthy, "hive_getHealthInfo", ppmap[common.Bytes2Hex(id.Bytes())]); err != nil {
return fmt.Errorf("error getting node health: %s", err)
}
@@ -415,9 +362,6 @@ func discoveryPersistenceSimulation(nodes, conns int, adapter adapters.NodeAdapt
return fmt.Errorf("error getting node string %s", err)
}
log.Info(nodeStr)
- for _, a := range addrs {
- log.Info(common.Bytes2Hex(a))
- }
if !healthy.ConnectNN || healthy.CountKnowNN == 0 {
isHealthy = false
break
@@ -457,23 +401,7 @@ func discoveryPersistenceSimulation(nodes, conns int, adapter adapters.NodeAdapt
return nil
}
- //connects in a chain
- wg := sync.WaitGroup{}
- //connects in a ring
- for i := range ids {
- for j := 1; j <= conns; j++ {
- k := (i + j) % len(ids)
- if k == i {
- k = (k + 1) % len(ids)
- }
- wg.Add(1)
- go func(i, k int) {
- defer wg.Done()
- net.Connect(ids[i], ids[k])
- }(i, k)
- }
- }
- wg.Wait()
+ net.ConnectNodesChain(nil)
log.Debug(fmt.Sprintf("nodes: %v", len(addrs)))
// construct the peer pot, so that kademlia health can be checked
check := func(ctx context.Context, id enode.ID) (bool, error) {
@@ -494,7 +422,7 @@ func discoveryPersistenceSimulation(nodes, conns int, adapter adapters.NodeAdapt
healthy := &network.Health{}
ppmap := network.NewPeerPotMap(network.NewKadParams().NeighbourhoodSize, addrs)
- if err := client.Call(&healthy, "hive_healthy", ppmap); err != nil {
+ if err := client.Call(&healthy, "hive_getHealthInfo", ppmap[common.Bytes2Hex(id.Bytes())]); err != nil {
return false, fmt.Errorf("error getting node health: %s", err)
}
log.Info(fmt.Sprintf("node %4s healthy: got nearest neighbours: %v, know nearest neighbours: %v", id, healthy.ConnectNN, healthy.KnowNN))
diff --git a/swarm/network/simulations/overlay_test.go b/swarm/network/simulations/overlay_test.go
index 6ccdb5ce2..41ed5ed26 100644
--- a/swarm/network/simulations/overlay_test.go
+++ b/swarm/network/simulations/overlay_test.go
@@ -32,7 +32,7 @@ import (
)
var (
- nodeCount = 16
+ nodeCount = 10
)
//This test is used to test the overlay simulation.
@@ -179,7 +179,7 @@ func watchSimEvents(net *simulations.Network, ctx context.Context, trigger chan
case ev := <-events:
//only catch node up events
if ev.Type == simulations.EventTypeNode {
- if ev.Node.Up {
+ if ev.Node.Up() {
log.Debug("got node up event", "event", ev, "node", ev.Node.Config.ID)
select {
case trigger <- ev.Node.Config.ID:
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 29b917d39..afd08d275 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -26,17 +26,19 @@ import (
"math/rand"
"os"
"strings"
+ "sync"
"sync/atomic"
- "testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
@@ -67,7 +69,81 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}
-func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
+// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
+func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
+func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
+ return netStore, delivery, cleanup, nil
+}
+
+// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
+func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+ addr := network.NewAddr(ctx.Config.Node())
+
+ netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
+
+ return addr, netStore, delivery, cleanup, nil
+}
+
+func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
+ n := ctx.Config.Node()
+
+ store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ if *useMockStore {
+ store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
+ }
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ localStore := store.(*storage.LocalStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
+
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
+ delivery := NewDelivery(kad, netStore)
+
+ bucket.Store(bucketKeyStore, store)
+ bucket.Store(bucketKeyDB, netStore)
+ bucket.Store(bucketKeyDelivery, delivery)
+ bucket.Store(bucketKeyFileStore, fileStore)
+
+ cleanup := func() {
+ netStore.Close()
+ os.RemoveAll(datadir)
+ }
+
+ return netStore, delivery, cleanup, nil
+}
+
+func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
@@ -75,7 +151,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
// temp datadir
datadir, err := ioutil.TempDir("", "streamer")
if err != nil {
- return nil, nil, nil, func() {}, err
+ return nil, nil, nil, nil, err
}
removeDataDir := func() {
os.RemoveAll(datadir)
@@ -87,12 +163,14 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
localStore, err := storage.NewTestLocalStoreForAddr(params)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
- return nil, nil, nil, removeDataDir, err
+ removeDataDir()
+ return nil, nil, nil, nil, err
}
delivery := NewDelivery(to, netStore)
@@ -102,10 +180,11 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
streamer.Close()
removeDataDir()
}
- protocolTester := p2ptest.NewProtocolTester(t, addr.ID(), 1, streamer.runProtocol)
+ protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)
- err = waitForPeers(streamer, 1*time.Second, 1)
+ err = waitForPeers(streamer, 10*time.Second, 1)
if err != nil {
+ teardown()
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
}
@@ -138,6 +217,11 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
+// not used in this context, only to fulfill ChunkStore interface
+func (rrs *roundRobinStore) Has(ctx context.Context, addr storage.Address) bool {
+ panic("RoundRobinStor doesn't support HasChunk")
+}
+
func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
@@ -236,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
}
return store, datadir, nil
}
+
+// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
+// disconnected to true in case of a disconnect event.
+func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
+ log.Debug("Watching for disconnections")
+ disconnections := sim.PeerEvents(
+ ctx,
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Drop(),
+ )
+ disconnected = new(boolean)
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case d := <-disconnections:
+ if d.Error != nil {
+ log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
+ } else {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
+ }
+ disconnected.set(true)
+ }
+ }
+ }()
+ return disconnected
+}
+
+// boolean is used to concurrently set
+// and read a boolean value.
+type boolean struct {
+ v bool
+ mu sync.RWMutex
+}
+
+// set sets the value.
+func (b *boolean) set(v bool) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.v = v
+}
+
+// bool reads the value.
+func (b *boolean) bool() bool {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ return b.v
+}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index e1a13fe8d..fae6994f0 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -144,7 +144,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
ctx, osp = spancontext.StartSpan(
ctx,
"retrieve.request")
- defer osp.Finish()
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
@@ -167,6 +166,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
}()
go func() {
+ defer osp.Finish()
chunk, err := d.chunkStore.Get(ctx, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
@@ -213,11 +213,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
ctx, osp = spancontext.StartSpan(
ctx,
"chunk.delivery")
- defer osp.Finish()
processReceivedChunksCount.Inc(1)
go func() {
+ defer osp.Finish()
+
req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
@@ -255,8 +256,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
return true
}
sp = d.getPeer(id)
+ // sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
if sp == nil {
- //log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
return true
}
spID = &id
@@ -271,7 +272,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
Addr: req.Addr,
SkipCheck: req.SkipCheck,
HopCount: req.HopCount,
- }, Top)
+ }, Top, "request.from.peers")
if err != nil {
return nil, nil, err
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 70d3829b3..49e4a423a 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -21,9 +21,7 @@ import (
"context"
"errors"
"fmt"
- "os"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -48,11 +46,11 @@ func TestStreamerRetrieveRequest(t *testing.T) {
Retrieval: RetrievalClientOnly,
Syncing: SyncingDisabled,
}
- tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(regOpts)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -97,14 +95,14 @@ func TestStreamerRetrieveRequest(t *testing.T) {
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled, //do no syncing
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -169,14 +167,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// upstream request server receives a retrieve Request and responds with
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -359,14 +357,14 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
}
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
- tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return &testClient{
@@ -455,164 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
}
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
- sim := simulation.New(map[string]simulation.ServiceFunc{
- "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
- store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
-
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- Syncing: SyncingDisabled,
- Retrieval: RetrievalEnabled,
- }, nil)
- bucket.Store(bucketKeyRegistry, r)
-
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ t.Helper()
+ t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
- return r, cleanup, nil
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalEnabled,
+ }, nil)
+ bucket.Store(bucketKeyRegistry, r)
- },
- })
- defer sim.Close()
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
- log.Info("Adding nodes to simulation")
- _, err := sim.AddNodesAndConnectChain(nodes)
- if err != nil {
- t.Fatal(err)
- }
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
- log.Info("Starting simulation")
- ctx := context.Background()
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
- nodeIDs := sim.UpNodeIDs()
- //determine the pivot node to be the first node of the simulation
- pivot := nodeIDs[0]
-
- //distribute chunks of a random file into Stores of nodes 1 to nodes
- //we will do this by creating a file store with an underlying round-robin store:
- //the file store will create a hash for the uploaded file, but every chunk will be
- //distributed to different nodes via round-robin scheduling
- log.Debug("Writing file to round-robin file store")
- //to do this, we create an array for chunkstores (length minus one, the pivot node)
- stores := make([]storage.ChunkStore, len(nodeIDs)-1)
- //we then need to get all stores from the sim....
- lStores := sim.NodesItems(bucketKeyStore)
- i := 0
- //...iterate the buckets...
- for id, bucketVal := range lStores {
- //...and remove the one which is the pivot node
- if id == pivot {
- continue
- }
- //the other ones are added to the array...
- stores[i] = bucketVal.(storage.ChunkStore)
- i++
- }
- //...which then gets passed to the round-robin file store
- roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
- //now we can actually upload a (random) file to the round-robin store
- size := chunkCount * chunkSize
- log.Debug("Storing data to file store")
- fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
- // wait until all chunks stored
- if err != nil {
- return err
- }
- err = wait(ctx)
+ log.Info("Adding nodes to simulation")
+ _, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
- return err
+ t.Fatal(err)
}
- log.Debug("Waiting for kademlia")
- // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- return err
- }
-
- //get the pivot node's filestore
- item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
- if !ok {
- return fmt.Errorf("No filestore")
- }
- pivotFileStore := item.(*storage.FileStore)
- log.Debug("Starting retrieval routine")
- retErrC := make(chan error)
- go func() {
- // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
- // we must wait for the peer connections to have started before requesting
- n, err := readAll(pivotFileStore, fileHash)
- log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
- retErrC <- err
- }()
-
- log.Debug("Watching for disconnections")
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
+ log.Info("Starting simulation")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ nodeIDs := sim.UpNodeIDs()
+ //determine the pivot node to be the first node of the simulation
+ pivot := nodeIDs[0]
+
+ //distribute chunks of a random file into Stores of nodes 1 to nodes
+ //we will do this by creating a file store with an underlying round-robin store:
+ //the file store will create a hash for the uploaded file, but every chunk will be
+ //distributed to different nodes via round-robin scheduling
+ log.Debug("Writing file to round-robin file store")
+ //to do this, we create an array for chunkstores (length minus one, the pivot node)
+ stores := make([]storage.ChunkStore, len(nodeIDs)-1)
+ //we then need to get all stores from the sim....
+ lStores := sim.NodesItems(bucketKeyStore)
+ i := 0
+ //...iterate the buckets...
+ for id, bucketVal := range lStores {
+ //...and remove the one which is the pivot node
+ if id == pivot {
+ continue
}
+ //the other ones are added to the array...
+ stores[i] = bucketVal.(storage.ChunkStore)
+ i++
}
- }()
- defer func() {
+ //...which then gets passed to the round-robin file store
+ roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
+ //now we can actually upload a (random) file to the round-robin store
+ size := chunkCount * chunkSize
+ log.Debug("Storing data to file store")
+ fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
+ // wait until all chunks stored
+ if err != nil {
+ return err
+ }
+ err = wait(ctx)
if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
+ return err
+ }
+
+ log.Debug("Waiting for kademlia")
+ // TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
+ if _, err := sim.WaitTillHealthy(ctx); err != nil {
+ return err
+ }
+
+ //get the pivot node's filestore
+ item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
+ }
+ pivotFileStore := item.(*storage.FileStore)
+ log.Debug("Starting retrieval routine")
+ retErrC := make(chan error)
+ go func() {
+ // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
+ // we must wait for the peer connections to have started before requesting
+ n, err := readAll(pivotFileStore, fileHash)
+ log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
+ retErrC <- err
+ }()
+
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
- }
- }()
+ }()
- //finally check that the pivot node gets all chunks via the root hash
- log.Debug("Check retrieval")
- success := true
- var total int64
- total, err = readAll(pivotFileStore, fileHash)
- if err != nil {
- return err
- }
- log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
- if err != nil || total != int64(size) {
- success = false
- }
+ //finally check that the pivot node gets all chunks via the root hash
+ log.Debug("Check retrieval")
+ success := true
+ var total int64
+ total, err = readAll(pivotFileStore, fileHash)
+ if err != nil {
+ return err
+ }
+ log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
+ if err != nil || total != int64(size) {
+ success = false
+ }
- if !success {
- return fmt.Errorf("Test failed, chunks not available on all nodes")
- }
- if err := <-retErrC; err != nil {
- t.Fatalf("requesting chunks: %v", err)
+ if !success {
+ return fmt.Errorf("Test failed, chunks not available on all nodes")
+ }
+ if err := <-retErrC; err != nil {
+ return fmt.Errorf("requesting chunks: %v", err)
+ }
+ log.Debug("Test terminated successfully")
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
}
- log.Debug("Test terminated successfully")
- return nil
})
- if result.Error != nil {
- t.Fatal(result.Error)
- }
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
@@ -644,25 +614,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
- store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
@@ -670,12 +625,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -686,21 +643,22 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
b.Fatal(err)
}
- ctx := context.Background()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]
item, ok := sim.NodeItem(node, bucketKeyFileStore)
if !ok {
- b.Fatal("No filestore")
+ return errors.New("No filestore")
}
remoteFileStore := item.(*storage.FileStore)
pivotNode := nodeIDs[0]
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
if !ok {
- b.Fatal("No filestore")
+ return errors.New("No filestore")
}
netStore := item.(*storage.NetStore)
@@ -708,26 +666,10 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
return err
}
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
+ disconnected := watchDisconnections(ctx, sim)
defer func() {
- if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
- err = errors.New("disconnect events received")
- }
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
}
}()
// benchmark loop
@@ -742,12 +684,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
ctx := context.TODO()
hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
if err != nil {
- b.Fatalf("expected no error. got %v", err)
+ return fmt.Errorf("store: %v", err)
}
// wait until all chunks stored
err = wait(ctx)
if err != nil {
- b.Fatalf("expected no error. got %v", err)
+ return fmt.Errorf("wait store: %v", err)
}
// collect the hashes
hashes[i] = hash
@@ -783,10 +725,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
break Loop
}
}
- if err != nil {
- b.Fatal(err)
- }
- return nil
+ return err
})
if result.Error != nil {
b.Fatal(result.Error)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 8f2bed9d6..009a941ef 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -21,9 +21,7 @@ import (
"encoding/binary"
"errors"
"fmt"
- "os"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -31,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -62,26 +59,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100)
sim := simulation.New(map[string]simulation.ServiceFunc{
- "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -97,11 +79,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})
- fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup := func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -134,13 +117,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
_, wait, err := fileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
if err != nil {
- log.Error("Store error: %v", "err", err)
- t.Fatal(err)
+ return fmt.Errorf("store: %v", err)
}
err = wait(ctx)
if err != nil {
- log.Error("Wait error: %v", "err", err)
- t.Fatal(err)
+ return fmt.Errorf("wait store: %v", err)
}
item, ok = sim.NodeItem(checker, bucketKeyRegistry)
@@ -152,32 +133,15 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
- log.Debug("Watching for disconnections")
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
+ disconnected := watchDisconnections(ctx, sim)
defer func() {
- if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
- err = errors.New("disconnect events received")
- }
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
}
}()
diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go
index 65cde2411..501660fab 100644
--- a/swarm/network/stream/lightnode_test.go
+++ b/swarm/network/stream/lightnode_test.go
@@ -28,11 +28,11 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
Retrieval: RetrievalClientOnly,
Syncing: SyncingDisabled,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -67,11 +67,11 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -111,11 +111,11 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
@@ -156,11 +156,11 @@ func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
}
- tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
- defer teardown()
+ tester, _, _, teardown, err := newStreamerTester(registryOptions)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
node := tester.Nodes[0]
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index b293724cc..de4e8a3bb 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -300,7 +300,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(ctx, msg, c.priority)
+ err := p.SendPriority(ctx, msg, c.priority, "")
if err != nil {
log.Warn("SendPriority error", "err", err)
}
diff --git a/swarm/network/stream/norace_test.go b/swarm/network/stream/norace_test.go
new file mode 100644
index 000000000..b324f6939
--- /dev/null
+++ b/swarm/network/stream/norace_test.go
@@ -0,0 +1,24 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !race
+
+package stream
+
+// Provide a flag to reduce the scope of tests when running them
+// with race detector. Some of the tests are doing a lot of allocations
+// on the heap, and race detector uses much more memory to track them.
+const raceTest = false
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 4bccf56f5..68da8f44a 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -65,6 +65,7 @@ type Peer struct {
// on creating a new client in offered hashes handler.
clientParams map[Stream]*clientParams
quit chan struct{}
+ spans sync.Map
}
type WrappedPriorityMsg struct {
@@ -82,10 +83,16 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}),
+ spans: sync.Map{},
}
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
+ defer p.spans.Delete(wmsg.Context)
+ sp, ok := p.spans.Load(wmsg.Context)
+ if ok {
+ defer sp.(opentracing.Span).Finish()
+ }
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
@@ -130,7 +137,6 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
// Deliver sends a storeRequestMsg protocol message to the peer
// Depending on the `syncing` parameter we send different message types
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
- var sp opentracing.Span
var msg interface{}
spanName := "send.chunk.delivery"
@@ -151,18 +157,22 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
}
spanName += ".retrieval"
}
- ctx, sp = spancontext.StartSpan(
- ctx,
- spanName)
- defer sp.Finish()
- return p.SendPriority(ctx, msg, priority)
+ return p.SendPriority(ctx, msg, priority, spanName)
}
// SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8, traceId string) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
+ if traceId != "" {
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ traceId,
+ )
+ p.spans.Store(ctx, sp)
+ }
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
@@ -205,7 +215,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
- return p.SendPriority(ctx, msg, s.priority)
+ return p.SendPriority(ctx, msg, s.priority, "send.offered.hashes")
}
func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/race_test.go b/swarm/network/stream/race_test.go
new file mode 100644
index 000000000..8aed3542b
--- /dev/null
+++ b/swarm/network/stream/race_test.go
@@ -0,0 +1,23 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build race
+
+package stream
+
+// Reduce the scope of some tests when running with race detector,
+// as it raises the memory consumption significantly.
+const raceTest = true
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index d345ac8d0..afb023ae2 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -18,7 +18,6 @@ package stream
import (
"context"
"fmt"
- "os"
"sync"
"testing"
"time"
@@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -76,7 +74,7 @@ func TestRetrieval(t *testing.T) {
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
- err := runRetrievalTest(*chunks, *nodes)
+ err := runRetrievalTest(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
@@ -95,53 +93,37 @@ func TestRetrieval(t *testing.T) {
}
for _, n := range nodeCnt {
for _, c := range chnkCnt {
- err := runRetrievalTest(c, n)
- if err != nil {
- t.Fatal(err)
- }
+ t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
+ err := runRetrievalTest(t, c, n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ })
}
}
}
}
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
- "streamer": retrievalStreamerFunc,
-}
-
-func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
-
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalEnabled,
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: 3 * time.Second,
- }, nil)
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe,
+ SyncUpdateDelay: 3 * time.Second,
+ }, nil)
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
- return r, cleanup, nil
+ return r, cleanup, nil
+ },
}
/*
@@ -171,7 +153,7 @@ func runFileRetrievalTest(nodeCount int) error {
return err
}
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelSimRun()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
@@ -245,7 +227,8 @@ simulation's `action` function.
The snapshot should have 'streamer' in its service list.
*/
-func runRetrievalTest(chunkCount int, nodeCount int) error {
+func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
+ t.Helper()
sim := simulation.New(retrievalSimServiceMap)
defer sim.Close()
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 6af19c12a..b45d0aed5 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -17,11 +17,12 @@ package stream
import (
"context"
+ "errors"
"fmt"
+ "io/ioutil"
"os"
"runtime"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -92,6 +93,15 @@ func TestSyncingViaGlobalSync(t *testing.T) {
if *longrunning {
chnkCnt = []int{1, 8, 32, 256, 1024}
nodeCnt = []int{16, 32, 64, 128, 256}
+ } else if raceTest {
+ // TestSyncingViaGlobalSync allocates a lot of memory
+ // with race detector. By reducing the number of chunks
+ // and nodes, memory consumption is lower and data races
+ // are still checked, while correctness of syncing is
+ // tested with more chunks and nodes in regular (!race)
+ // tests.
+ chnkCnt = []int{4}
+ nodeCnt = []int{16}
} else {
//default test
chnkCnt = []int{4, 32}
@@ -107,42 +117,43 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}
var simServiceMap = map[string]simulation.ServiceFunc{
- "streamer": streamerFunc,
-}
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+ if err != nil {
+ return nil, nil, err
+ }
-func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
-
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- Retrieval: RetrievalDisabled,
- Syncing: SyncingAutoSubscribe,
- SyncUpdateDelay: 3 * time.Second,
- }, nil)
-
- bucket.Store(bucketKeyRegistry, r)
-
- cleanup = func() {
- os.RemoveAll(datadir)
- netStore.Close()
- r.Close()
- }
+ var dir string
+ var store *state.DBStore
+ if raceTest {
+ // Use on-disk DBStore to reduce memory consumption in race tests.
+ dir, err = ioutil.TempDir("", "swarm-stream-")
+ if err != nil {
+ return nil, nil, err
+ }
+ store, err = state.NewDBStore(dir)
+ if err != nil {
+ return nil, nil, err
+ }
+ } else {
+ store = state.NewInmemoryStore()
+ }
+
+ r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
+ SyncUpdateDelay: 3 * time.Second,
+ }, nil)
- return r, cleanup, nil
+ bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
}
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
@@ -171,36 +182,24 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
t.Fatal(err)
}
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
-
result := runSim(conf, ctx, sim, chunkCount)
if result.Error != nil {
t.Fatal(result.Error)
}
- if yes, ok := disconnected.Load().(bool); ok && yes {
- t.Fatal("disconnect events received")
- }
log.Info("Simulation ended")
}
func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {
- return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
+ }
+ }()
+
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
//get the kademlia overlay address from this ID
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index fb571c856..cb5912185 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -359,7 +359,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(context.TODO(), msg, priority)
+ return peer.SendPriority(context.TODO(), msg, priority, "")
}
func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
@@ -516,6 +516,11 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod
// nil as base takes the node's base; we need to pass 255 as `EachConn` runs
// from deepest bins backwards
kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
+ // nodes that do not provide stream protocol
+ // should not be subscribed, e.g. bootnodes
+ if !p.HasCap("stream") {
+ return true
+ }
//if the peer's bin is shallower than the kademlia depth,
//only the peer's bin should be subscribed
if po < kadDepth {
@@ -725,7 +730,8 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
if err != nil {
return err
}
- if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
+
+ if err := p.SendPriority(context.TODO(), tp, c.priority, ""); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
@@ -929,3 +935,33 @@ func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, prior
func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
return api.streamer.Unsubscribe(peerId, s)
}
+
+/*
+GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
+It can be called via RPC.
+It returns a map of node IDs with an array of string representations of Stream objects.
+*/
+func (api *API) GetPeerSubscriptions() map[string][]string {
+ //create the empty map
+ pstreams := make(map[string][]string)
+
+ //iterate all streamer peers
+ api.streamer.peersMu.RLock()
+ defer api.streamer.peersMu.RUnlock()
+
+ for id, p := range api.streamer.peers {
+ var streams []string
+ //every peer has a map of stream servers
+ //every stream server represents a subscription
+ p.serverMu.RLock()
+ for s := range p.servers {
+ //append the string representation of the stream
+ //to the list for this peer
+ streams = append(streams, s.String())
+ }
+ p.serverMu.RUnlock()
+ //set the array of stream servers to the map
+ pstreams[id.String()] = streams
+ }
+ return pstreams
+}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index cdaeb92d0..e92ee3783 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -22,23 +22,29 @@ import (
"errors"
"fmt"
"strconv"
+ "strings"
+ "sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
+ "github.com/ethereum/go-ethereum/swarm/state"
"golang.org/x/crypto/sha3"
)
func TestStreamerSubscribe(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top)
@@ -48,11 +54,11 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top)
@@ -139,11 +145,11 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
return newTestClient(t), nil
@@ -232,11 +238,11 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", false)
@@ -299,11 +305,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -365,11 +371,11 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -409,11 +415,11 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -472,11 +478,11 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -537,11 +543,11 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
stream := NewStream("foo", "", true)
@@ -636,11 +642,11 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
- defer teardown()
+ tester, streamer, _, teardown, err := newStreamerTester(nil)
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 10), nil
@@ -769,15 +775,15 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
// leaving place for new streams.
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -845,13 +851,13 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
// error message exchange.
func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
var maxPeerServers = 6
- tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{
MaxPeerServers: maxPeerServers,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t, 0), nil
@@ -930,14 +936,14 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
//TestHasPriceImplementation is to check that the Registry has a
//`Price` interface implementation
func TestHasPriceImplementation(t *testing.T) {
- _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ _, r, _, teardown, err := newStreamerTester(&RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
})
- defer teardown()
if err != nil {
t.Fatal(err)
}
+ defer teardown()
if r.prices == nil {
t.Fatal("No prices implementation available for the stream protocol")
@@ -1105,7 +1111,6 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
}
-
// print some output
for p, subs := range fakeSubscriptions {
log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p))
@@ -1114,3 +1119,239 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
}
}
+
+// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function
+func TestGetSubscriptions(t *testing.T) {
+ // create an amount of dummy peers
+ testPeerCount := 8
+ // every peer will have this amount of dummy servers
+ testServerCount := 4
+ // the peerMap which will store this data for the registry
+ peerMap := make(map[enode.ID]*Peer)
+ // create the registry
+ r := &Registry{}
+ api := NewAPI(r)
+ // call once, at this point should be empty
+ regs := api.GetPeerSubscriptions()
+ if len(regs) != 0 {
+ t.Fatal("Expected subscription count to be 0, but it is not")
+ }
+
+ // now create a number of dummy servers for each node
+ for i := 0; i < testPeerCount; i++ {
+ addr := network.RandomAddr()
+ id := addr.ID()
+ p := &Peer{}
+ p.servers = make(map[Stream]*server)
+ for k := 0; k < testServerCount; k++ {
+ s := Stream{
+ Name: strconv.Itoa(k),
+ Key: "",
+ Live: false,
+ }
+ p.servers[s] = &server{}
+ }
+ peerMap[id] = p
+ }
+ r.peers = peerMap
+
+ // call the subscriptions again
+ regs = api.GetPeerSubscriptions()
+ // count how many (fake) subscriptions there are
+ cnt := 0
+ for _, reg := range regs {
+ for range reg {
+ cnt++
+ }
+ }
+ // check expected value
+ expectedCount := testPeerCount * testServerCount
+ if cnt != expectedCount {
+ t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt)
+ }
+}
+
+/*
+TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
+starts the simulation, waits for SyncUpdateDelay in order to kick off
+stream registration, then tests that there are subscriptions.
+*/
+func TestGetSubscriptionsRPC(t *testing.T) {
+
+ // arbitrarily set to 4
+ nodeCount := 4
+ // run with more nodes if `longrunning` flag is set
+ if *longrunning {
+ nodeCount = 64
+ }
+ // set the syncUpdateDelay for sync registrations to start
+ syncUpdateDelay := 200 * time.Millisecond
+ // holds the msg code for SubscribeMsg
+ var subscribeMsgCode uint64
+ var ok bool
+ var expectedMsgCount counter
+
+ // this channel signalizes that the expected amount of subscriptiosn is done
+ allSubscriptionsDone := make(chan struct{})
+ // after the test, we need to reset the subscriptionFunc to the default
+ defer func() { subscriptionFunc = doRequestSubscription }()
+
+ // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
+ subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
+ expectedMsgCount.inc()
+ doRequestSubscription(r, p, bin, subs)
+ return true
+ }
+ // create a standard sim
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // configure so that sync registrations actually happen
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe, //enable sync registrations
+ SyncUpdateDelay: syncUpdateDelay,
+ }, nil)
+
+ // get the SubscribeMsg code
+ subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
+ if !ok {
+ t.Fatal("Message code for SubscribeMsg not found")
+ }
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
+
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancelSimRun()
+
+ // upload a snapshot
+ err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // setup the filter for SubscribeMsg
+ msgs := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
+ )
+
+ // strategy: listen to all SubscribeMsg events; after every event we wait
+ // if after `waitDuration` no more messages are being received, we assume the
+ // subscription phase has terminated!
+
+ // the loop in this go routine will either wait for new message events
+ // or times out after 1 second, which signals that we are not receiving
+ // any new subscriptions any more
+ go func() {
+ //for long running sims, waiting 1 sec will not be enough
+ waitDuration := time.Duration(nodeCount/16) * time.Second
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case m := <-msgs: // just reset the loop
+ if m.Error != nil {
+ log.Error("stream message", "err", m.Error)
+ continue
+ }
+ log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
+ case <-time.After(waitDuration):
+ // one second passed, don't assume more subscriptions
+ allSubscriptionsDone <- struct{}{}
+ log.Info("All subscriptions received")
+ return
+
+ }
+ }
+ }()
+
+ //run the simulation
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ log.Info("Simulation running")
+ nodes := sim.Net.Nodes
+
+ //wait until all subscriptions are done
+ select {
+ case <-allSubscriptionsDone:
+ case <-ctx.Done():
+ return errors.New("Context timed out")
+ }
+
+ log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount.count())
+ //now iterate again, this time we call each node via RPC to get its subscriptions
+ realCount := 0
+ for _, node := range nodes {
+ //create rpc client
+ client, err := node.Client()
+ if err != nil {
+ return fmt.Errorf("create node 1 rpc client fail: %v", err)
+ }
+
+ //ask it for subscriptions
+ pstreams := make(map[string][]string)
+ err = client.Call(&pstreams, "stream_getPeerSubscriptions")
+ if err != nil {
+ return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
+ }
+ //length of the subscriptions can not be smaller than number of peers
+ log.Debug("node subscriptions", "node", node.String())
+ for p, ps := range pstreams {
+ log.Debug("... with", "peer", p)
+ for _, s := range ps {
+ log.Debug(".......", "stream", s)
+ // each node also has subscriptions to RETRIEVE_REQUEST streams,
+ // we need to ignore those, we are only counting SYNC streams
+ if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
+ realCount++
+ }
+ }
+ }
+ }
+ // every node is mutually subscribed to each other, so the actual count is half of it
+ emc := expectedMsgCount.count()
+ if realCount/2 != emc {
+ return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
+ }
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+}
+
+// counter is used to concurrently increment
+// and read an integer value.
+type counter struct {
+ v int
+ mu sync.RWMutex
+}
+
+// Increment the counter.
+func (c *counter) inc() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.v++
+}
+
+// Read the counter value.
+func (c *counter) count() int {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+
+ return c.v
+}
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index 014ec9a98..be0752a9d 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -24,7 +24,6 @@ import (
"math"
"os"
"sync"
- "sync/atomic"
"testing"
"time"
@@ -38,7 +37,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
- mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
@@ -46,9 +44,15 @@ const dataChunkCount = 200
func TestSyncerSimulation(t *testing.T) {
testSyncBetweenNodes(t, 2, dataChunkCount, true, 1)
- testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
- testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
- testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
+ // This test uses much more memory when running with
+ // race detector. Allow it to finish successfully by
+ // reducing its scope, and still check for data races
+ // with the smallest number of nodes.
+ if !raceTest {
+ testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
+ testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
+ testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
+ }
}
func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
@@ -73,50 +77,46 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
+ addr := network.NewAddr(ctx.Config.Node())
//hack to put addresses in same space
addr.OAddr[0] = byte(0)
- if *useMockStore {
- store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr)
- } else {
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
- }
+ netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- bucket.Store(bucketKeyDelivery, delivery)
+ var dir string
+ var store *state.DBStore
+ if raceTest {
+ // Use on-disk DBStore to reduce memory consumption in race tests.
+ dir, err = ioutil.TempDir("", "swarm-stream-")
+ if err != nil {
+ return nil, nil, err
+ }
+ store, err = state.NewDBStore(dir)
+ if err != nil {
+ return nil, nil, err
+ }
+ } else {
+ store = state.NewInmemoryStore()
+ }
- r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
}, nil)
- fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
- bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ r.Close()
+ clean()
+ if dir != "" {
+ os.RemoveAll(dir)
+ }
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -139,26 +139,10 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
nodeIndex[id] = i
}
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- var disconnected atomic.Value
- go func() {
- for d := range disconnections {
- if d.Error != nil {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- disconnected.Store(true)
- }
- }
- }()
+ disconnected := watchDisconnections(ctx, sim)
defer func() {
- if err != nil {
- if yes, ok := disconnected.Load().(bool); ok && yes {
- err = errors.New("disconnect events received")
- }
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
}
}()
@@ -167,7 +151,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
id := nodeIDs[j]
client, err := sim.Net.GetNode(id).Client()
if err != nil {
- t.Fatal(err)
+ return fmt.Errorf("node %s client: %v", id, err)
}
sid := nodeIDs[j+1]
client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
@@ -183,7 +167,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
size := chunkCount * chunkSize
_, wait, err := fileStore.Store(ctx, testutil.RandomReader(j, size), int64(size), false)
if err != nil {
- t.Fatal(err.Error())
+ return fmt.Errorf("fileStore.Store: %v", err)
}
wait(ctx)
}
@@ -251,44 +235,26 @@ func TestSameVersionID(t *testing.T) {
v := uint(1)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
-
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
+
//assign to each node the same version ID
r.spec.Version = v
- bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -316,7 +282,7 @@ func TestSameVersionID(t *testing.T) {
//the peers should connect, thus getting the peer should not return nil
if registry.getPeer(nodes[1]) == nil {
- t.Fatal("Expected the peer to not be nil, but it is")
+ return errors.New("Expected the peer to not be nil, but it is")
}
return nil
})
@@ -333,46 +299,27 @@ func TestDifferentVersionID(t *testing.T) {
v := uint(0)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- var store storage.ChunkStore
- var datadir string
-
- node := ctx.Config.Node()
- addr := network.NewAddr(node)
-
- store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
- if err != nil {
- return nil, nil, err
- }
- bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- store.Close()
- os.RemoveAll(datadir)
- }
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyDB, netStore)
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
-
- bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
}, nil)
+ bucket.Store(bucketKeyRegistry, r)
//increase the version ID for each node
v++
r.spec.Version = v
- bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
return r, cleanup, nil
-
},
})
defer sim.Close()
@@ -400,7 +347,7 @@ func TestDifferentVersionID(t *testing.T) {
//getting the other peer should fail due to the different version numbers
if registry.getPeer(nodes[1]) != nil {
- t.Fatal("Expected the peer to be nil, but it is not")
+ return errors.New("Expected the peer to be nil, but it is not")
}
return nil
})
diff --git a/swarm/network/stream/testing/snapshot_4.json b/swarm/network/stream/testing/snapshot_4.json
new file mode 100644
index 000000000..a8b617407
--- /dev/null
+++ b/swarm/network/stream/testing/snapshot_4.json
@@ -0,0 +1 @@
+{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]}
diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
index 18b4c8fb0..cf4405ec1 100644
--- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go
+++ b/swarm/network/stream/visualized_snapshot_sync_sim_test.go
@@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io"
- "os"
"sync"
"testing"
"time"
@@ -37,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -68,31 +66,6 @@ func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulati
return nodeCount, chunkCount, sim
}
-//watch for disconnections and wait for healthy
-func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) {
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
-
- if _, err := sim.WaitTillHealthy(ctx); err != nil {
- panic(err)
- }
-
- disconnections := sim.PeerEvents(
- context.Background(),
- sim.NodeIDs(),
- simulation.NewPeerEventsFilter().Drop(),
- )
-
- go func() {
- for d := range disconnections {
- log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
- panic("unexpected disconnect")
- cancelSimRun()
- }
- }()
-
- return ctx, cancelSimRun
-}
-
//This test requests bogus hashes into the network
func TestNonExistingHashesWithServer(t *testing.T) {
@@ -104,19 +77,25 @@ func TestNonExistingHashesWithServer(t *testing.T) {
panic(err)
}
- ctx, cancelSimRun := watchSim(sim)
- defer cancelSimRun()
-
//in order to get some meaningful visualization, it is beneficial
//to define a minimum duration of this test
testDuration := 20 * time.Second
- result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil {
+ if yes, ok := disconnected.Load().(bool); ok && yes {
+ err = errors.New("disconnect events received")
+ }
+ }
+ }()
+
//check on the node's FileStore (netstore)
id := sim.Net.GetRandomUpNode().ID()
item, ok := sim.NodeItem(id, bucketKeyFileStore)
if !ok {
- t.Fatalf("No filestore")
+ return errors.New("No filestore")
}
fileStore := item.(*storage.FileStore)
//create a bogus hash
@@ -171,21 +150,10 @@ func TestSnapshotSyncWithServer(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
- n := ctx.Config.Node()
- addr := network.NewAddr(n)
- store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
+ addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
if err != nil {
return nil, nil, err
}
- bucket.Store(bucketKeyStore, store)
- localStore := store.(*storage.LocalStore)
- netStore, err := storage.NewNetStore(localStore, nil)
- if err != nil {
- return nil, nil, err
- }
- kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, netStore)
- netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
@@ -201,9 +169,8 @@ func TestSnapshotSyncWithServer(t *testing.T) {
bucket.Store(bucketKeyRegistry, tr)
cleanup = func() {
- netStore.Close()
tr.Close()
- os.RemoveAll(datadir)
+ clean()
}
return tr, cleanup, nil
@@ -229,9 +196,6 @@ func TestSnapshotSyncWithServer(t *testing.T) {
panic(err)
}
- ctx, cancelSimRun := watchSim(sim)
- defer cancelSimRun()
-
//run the sim
result := runSim(conf, ctx, sim, chunkCount)