aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/intervals_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/intervals_test.go')
-rw-r--r--swarm/network/stream/intervals_test.go78
1 files changed, 43 insertions, 35 deletions
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index f4294134b..452aaca76 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -38,13 +38,18 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
-func TestIntervals(t *testing.T) {
+func TestIntervalsLive(t *testing.T) {
testIntervals(t, true, nil, false)
- testIntervals(t, false, NewRange(9, 26), false)
- testIntervals(t, true, NewRange(9, 26), false)
-
testIntervals(t, true, nil, true)
+}
+
+func TestIntervalsHistory(t *testing.T) {
+ testIntervals(t, false, NewRange(9, 26), false)
testIntervals(t, false, NewRange(9, 26), true)
+}
+
+func TestIntervalsLiveAndHistory(t *testing.T) {
+ testIntervals(t, true, NewRange(9, 26), false)
testIntervals(t, true, NewRange(9, 26), true)
}
@@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return newTestExternalClient(db), nil
+ return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
@@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
t.Fatal(err)
}
- ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
@@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- log.Error("WaitKademlia error: %v", "err", err)
- return err
- }
-
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
@@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
+ err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
+ if err != nil {
+ return err
+ }
+
go func() {
for d := range disconnections {
if d.Error != nil {
@@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var liveHashesChan chan []byte
liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
- log.Error("Subscription error: %v", "err", err)
+ log.Error("get hashes", "err", err)
return
}
i := externalStreamSessionAt
@@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var historyHashesChan chan []byte
historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
+ log.Error("get hashes", "err", err)
return
}
@@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
}()
- err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
- if err != nil {
- return err
- }
if err := <-liveErrC; err != nil {
return err
}
@@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
- db *storage.DBAPI
+ store storage.SyncChunkStore
enableNotificationsC chan struct{}
}
-func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
+func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
- db: db,
+ store: store,
enableNotificationsC: make(chan struct{}),
}
}
-func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
- if chunk.ReqC == nil {
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
+ wait := c.store.FetchFunc(ctx, storage.Address(hash))
+ if wait == nil {
return nil
}
- c.hashes <- hash
- //NOTE: This was failing on go1.9.x with a deadlock.
- //Sometimes this function would just block
- //It is commented now, but it may be well worth after the chunk refactor
- //to re-enable this and see if the problem has been addressed
- /*
- return func() {
- return chunk.WaitToStore()
+ select {
+ case c.hashes <- hash:
+ case <-ctx.Done():
+ log.Warn("testExternalClient NeedData context", "err", ctx.Err())
+ return func(_ context.Context) error {
+ return ctx.Err()
}
- */
- return nil
+ }
+ return wait
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {