aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/common_test.go17
-rw-r--r--swarm/network/stream/delivery.go231
-rw-r--r--swarm/network/stream/delivery_test.go93
-rw-r--r--swarm/network/stream/intervals_test.go78
-rw-r--r--swarm/network/stream/messages.go87
-rw-r--r--swarm/network/stream/peer.go51
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go46
-rw-r--r--swarm/network/stream/snapshot_sync_test.go170
-rw-r--r--swarm/network/stream/stream.go28
-rw-r--r--swarm/network/stream/streamer_test.go8
-rw-r--r--swarm/network/stream/syncer.go94
-rw-r--r--swarm/network/stream/syncer_test.go32
12 files changed, 504 insertions, 431 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 491dc9fd5..e0d776e34 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
return nil, nil, nil, removeDataDir, err
}
- db := storage.NewDBAPI(localStore)
- delivery := NewDelivery(to, db)
- streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, removeDataDir, err
+ }
+
+ delivery := NewDelivery(to, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+ streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
@@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
-func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) {
+func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
-func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) {
+func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
- rrs.stores[idx].Put(ctx, chunk)
+ return rrs.stores[idx].Put(ctx, chunk)
}
func (rrs *roundRobinStore) Close() {
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 627352535..d0f27eebc 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -19,12 +19,11 @@ package stream
import (
"context"
"errors"
- "time"
- "github.com/ethereum/go-ethereum/common"
+ "fmt"
+
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/discover"
- cp "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -46,39 +45,34 @@ var (
)
type Delivery struct {
- db *storage.DBAPI
- kad *network.Kademlia
- receiveC chan *ChunkDeliveryMsg
- getPeer func(discover.NodeID) *Peer
+ chunkStore storage.SyncChunkStore
+ kad *network.Kademlia
+ getPeer func(discover.NodeID) *Peer
}
-func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery {
- d := &Delivery{
- db: db,
- kad: kad,
- receiveC: make(chan *ChunkDeliveryMsg, deliveryCap),
+func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
+ return &Delivery{
+ chunkStore: chunkStore,
+ kad: kad,
}
-
- go d.processReceivedChunks()
- return d
}
// SwarmChunkServer implements Server
type SwarmChunkServer struct {
deliveryC chan []byte
batchC chan []byte
- db *storage.DBAPI
+ chunkStore storage.ChunkStore
currentLen uint64
quit chan struct{}
}
// NewSwarmChunkServer is SwarmChunkServer constructor
-func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer {
+func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
s := &SwarmChunkServer{
- deliveryC: make(chan []byte, deliveryCap),
- batchC: make(chan []byte),
- db: db,
- quit: make(chan struct{}),
+ deliveryC: make(chan []byte, deliveryCap),
+ batchC: make(chan []byte),
+ chunkStore: chunkStore,
+ quit: make(chan struct{}),
}
go s.processDeliveries()
return s
@@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() {
// GetData retrives chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
@@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return err
}
streamer := s.Server.(*SwarmChunkServer)
- chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
- if chunk.ReqC != nil {
- if created {
- if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
- log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
- chunk.SetErrored(storage.ErrChunkForward)
- return nil
- }
+
+ var cancel func()
+ // TODO: do something with this hardcoded timeout, maybe use TTL in the future
+ ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
+
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-streamer.quit:
}
- go func() {
- var osp opentracing.Span
- ctx, osp = spancontext.StartSpan(
- ctx,
- "waiting.delivery")
- defer osp.Finish()
-
- t := time.NewTimer(10 * time.Minute)
- defer t.Stop()
-
- log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created)
- start := time.Now()
- select {
- case <-chunk.ReqC:
- log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start))
- case <-t.C:
- log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr)
- chunk.SetErrored(storage.ErrChunkTimeout)
- return
- }
- chunk.SetErrored(nil)
-
- if req.SkipCheck {
- err := sp.Deliver(ctx, chunk, s.priority)
- if err != nil {
- log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
- sp.Drop(err)
- }
+ cancel()
+ }()
+
+ go func() {
+ chunk, err := d.chunkStore.Get(ctx, req.Addr)
+ if err != nil {
+ log.Warn("ChunkStore.Get can not retrieve chunk", "err", err)
+ return
+ }
+ if req.SkipCheck {
+ err = sp.Deliver(ctx, chunk, s.priority)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
- streamer.deliveryC <- chunk.Addr[:]
- }()
- return nil
- }
- // TODO: call the retrieve function of the outgoing syncer
- if req.SkipCheck {
- log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr)
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
+ return
}
- return sp.Deliver(ctx, chunk, s.priority)
- }
- streamer.deliveryC <- chunk.Addr[:]
+ select {
+ case streamer.deliveryC <- chunk.Address()[:]:
+ case <-streamer.quit:
+ }
+
+ }()
+
return nil
}
@@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
+// TODO: Fix context SNAFU
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
@@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
"chunk.delivery")
defer osp.Finish()
- req.peer = sp
- d.receiveC <- req
- return nil
-}
+ processReceivedChunksCount.Inc(1)
-func (d *Delivery) processReceivedChunks() {
-R:
- for req := range d.receiveC {
- processReceivedChunksCount.Inc(1)
-
- if len(req.SData) > cp.DefaultSize+8 {
- log.Warn("received chunk is bigger than expected", "len", len(req.SData))
- continue R
- }
-
- // this should be has locally
- chunk, err := d.db.Get(context.TODO(), req.Addr)
- if err == nil {
- continue R
- }
- if err != storage.ErrFetching {
- log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk)
- continue R
- }
- select {
- case <-chunk.ReqC:
- log.Error("someone else delivered?", "hash", chunk.Addr.Hex())
- continue R
- default:
- }
-
- chunk.SData = req.SData
- d.db.Put(context.TODO(), chunk)
-
- go func(req *ChunkDeliveryMsg) {
- err := chunk.WaitToStore()
+ go func() {
+ req.peer = sp
+ err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
+ if err != nil {
if err == storage.ErrChunkInvalid {
+ // we removed this log because it spams the logs
+ // TODO: Enable this log line
+ // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
req.peer.Drop(err)
}
- }(req)
- }
+ }
+ }()
+ return nil
}
// RequestFromPeers sends a chunk retrieve request to
-func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
- var success bool
- var err error
+func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
+ var sp *Peer
+ spID := req.Source
- d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool {
- spId := p.ID()
- for _, p := range peersToSkip {
- if p == spId {
- log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId)
+ if spID != nil {
+ sp = d.getPeer(*spID)
+ if sp == nil {
+ return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
+ }
+ } else {
+ d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
+ id := p.ID()
+ // TODO: skip light nodes that do not accept retrieve requests
+ if req.SkipPeer(id.String()) {
+ log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
}
- }
- sp := d.getPeer(spId)
+ sp = d.getPeer(id)
+ if sp == nil {
+ log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
+ return true
+ }
+ spID = &id
+ return false
+ })
if sp == nil {
- log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
- return true
+ return nil, nil, errors.New("no peer found")
}
- err = sp.SendPriority(ctx, &RetrieveRequestMsg{
- Addr: hash,
- SkipCheck: skipCheck,
- }, Top)
- if err != nil {
- return true
- }
- requestFromPeersEachCount.Inc(1)
- success = true
- return false
- })
- if success {
- return nil
}
- return errors.New("no peer found")
+
+ err := sp.SendPriority(ctx, &RetrieveRequestMsg{
+ Addr: req.Addr,
+ SkipCheck: req.SkipCheck,
+ }, Top)
+ if err != nil {
+ return nil, nil, err
+ }
+ requestFromPeersEachCount.Inc(1)
+
+ return spID, sp.quit, nil
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 972cc859a..ece54d4ee 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) {
peerID := tester.IDs[0]
- streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true)
+ ctx := context.Background()
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ streamer.delivery.RequestFromPeers(ctx, req)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{
Code: 5,
Msg: &RetrieveRequestMsg{
- Addr: chunk.Addr[:],
+ Addr: chunk.Address()[:],
},
Peer: peerID,
},
@@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
})
hash := storage.Address(hash0[:])
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = hash
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk := storage.NewChunk(hash, hash)
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
hash = storage.Address(hash1[:])
- chunk = storage.NewChunk(hash, nil)
- chunk.SData = hash1[:]
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk = storage.NewChunk(hash, hash1[:])
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
chunkKey := hash0[:]
chunkData := hash1[:]
- chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey)
-
- if !created {
- t.Fatal("chunk already exists")
- }
- select {
- case <-chunk.ReqC:
- t.Fatal("chunk is already received")
- default:
- }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
},
},
p2ptest.Exchange{
- Label: "ChunkDeliveryRequest message",
+ Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
{
Code: 6,
@@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
-
- timeout := time.NewTimer(1 * time.Second)
-
- select {
- case <-timeout.C:
- t.Fatal("timeout receiving chunk")
- case <-chunk.ReqC:
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ // wait for the chunk to get stored
+ storedChunk, err := localStore.Get(ctx, chunkKey)
+ for err != nil {
+ select {
+ case <-ctx.Done():
+ t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
+ default:
+ }
+ storedChunk, err = localStore.Get(ctx, chunkKey)
+ time.Sleep(50 * time.Millisecond)
}
- storedChunk, err := localStore.Get(context.TODO(), chunkKey)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
- if !bytes.Equal(storedChunk.SData, chunkData) {
+ if !bytes.Equal(storedChunk.Data(), chunkData) {
t.Fatal("Retrieved chunk has different data than original")
}
@@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
@@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
@@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
DoSync: true,
SyncUpdateDelay: 0,
})
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index f4294134b..452aaca76 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -38,13 +38,18 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
-func TestIntervals(t *testing.T) {
+func TestIntervalsLive(t *testing.T) {
testIntervals(t, true, nil, false)
- testIntervals(t, false, NewRange(9, 26), false)
- testIntervals(t, true, NewRange(9, 26), false)
-
testIntervals(t, true, nil, true)
+}
+
+func TestIntervalsHistory(t *testing.T) {
+ testIntervals(t, false, NewRange(9, 26), false)
testIntervals(t, false, NewRange(9, 26), true)
+}
+
+func TestIntervalsLiveAndHistory(t *testing.T) {
+ testIntervals(t, true, NewRange(9, 26), false)
testIntervals(t, true, NewRange(9, 26), true)
}
@@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return newTestExternalClient(db), nil
+ return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
@@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
t.Fatal(err)
}
- ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
@@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- log.Error("WaitKademlia error: %v", "err", err)
- return err
- }
-
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
@@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
+ err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
+ if err != nil {
+ return err
+ }
+
go func() {
for d := range disconnections {
if d.Error != nil {
@@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var liveHashesChan chan []byte
liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
- log.Error("Subscription error: %v", "err", err)
+ log.Error("get hashes", "err", err)
return
}
i := externalStreamSessionAt
@@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var historyHashesChan chan []byte
historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
+ log.Error("get hashes", "err", err)
return
}
@@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
}()
- err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
- if err != nil {
- return err
- }
if err := <-liveErrC; err != nil {
return err
}
@@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
- db *storage.DBAPI
+ store storage.SyncChunkStore
enableNotificationsC chan struct{}
}
-func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
+func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
- db: db,
+ store: store,
enableNotificationsC: make(chan struct{}),
}
}
-func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
- if chunk.ReqC == nil {
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
+ wait := c.store.FetchFunc(ctx, storage.Address(hash))
+ if wait == nil {
return nil
}
- c.hashes <- hash
- //NOTE: This was failing on go1.9.x with a deadlock.
- //Sometimes this function would just block
- //It is commented now, but it may be well worth after the chunk refactor
- //to re-enable this and see if the problem has been addressed
- /*
- return func() {
- return chunk.WaitToStore()
+ select {
+ case c.hashes <- hash:
+ case <-ctx.Done():
+ log.Warn("testExternalClient NeedData context", "err", ctx.Err())
+ return func(_ context.Context) error {
+ return ctx.Err()
}
- */
- return nil
+ }
+ return wait
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index a19f63589..2e1a81e82 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -18,9 +18,7 @@ package stream
import (
"context"
- "errors"
"fmt"
- "sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
@@ -31,6 +29,8 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
+var syncBatchTimeout = 30 * time.Second
+
// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
@@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
@@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
}
@@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if err != nil {
return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
}
- wg := sync.WaitGroup{}
+
+ ctr := 0
+ errC := make(chan error)
+ ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
+
+ ctx = context.WithValue(ctx, "source", p.ID().String())
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
+ ctr++
want.Set(i/HashSize, true)
- wg.Add(1)
// create request and wait until the chunk data arrives and is stored
- go func(w func()) {
- w()
- wg.Done()
+ go func(w func(context.Context) error) {
+ select {
+ case errC <- w(ctx):
+ case <-ctx.Done():
+ }
}(wait)
}
}
- // done := make(chan bool)
- // go func() {
- // wg.Wait()
- // close(done)
- // }()
- // go func() {
- // select {
- // case <-done:
- // s.next <- s.batchDone(p, req, hashes)
- // case <-time.After(1 * time.Second):
- // p.Drop(errors.New("timeout waiting for batch to be delivered"))
- // }
- // }()
+
go func() {
- wg.Wait()
+ defer cancel()
+ for i := 0; i < ctr; i++ {
+ select {
+ case err := <-errC:
+ if err != nil {
+ log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ return
+ }
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
+ return
+ case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ }
+ }
select {
case c.next <- c.batchDone(p, req, hashes):
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
}
}()
// only send wantedKeysMsg if all missing chunks of the previous batch arrived
@@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
c.sessionAt = req.From
}
from, to := c.nextBatch(req.To + 1)
- log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
+ log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID())
if from == to {
return nil
}
@@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
To: to,
}
go func() {
+ log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
- case <-time.After(120 * time.Second):
- log.Warn("handleOfferedHashesMsg timeout, so dropping peer")
- p.Drop(errors.New("handle offered hashes timeout"))
- return
case err := <-c.next:
if err != nil {
- log.Warn("c.next dropping peer", "err", err)
+ log.Warn("c.next error dropping peer", "err", err)
p.Drop(err)
return
}
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
- log.Warn("SendPriority err, so dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendPriority error", "err", err)
}
}()
return nil
@@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
@@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = data
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
- }
+ chunk := storage.NewChunk(hash, data)
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 80b9ab711..1466a7a9c 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -33,8 +33,6 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
-var sendTimeout = 30 * time.Second
-
type notFoundError struct {
t string
s Stream
@@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
- p.Send(wmsg.Context, wmsg.Msg)
+ err := p.Send(wmsg.Context, wmsg.Msg)
+ if err != nil {
+ log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ }
})
+
+ // basic monitoring for pq contention
+ go func(pq *pq.PriorityQueue) {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ var len_maxi int
+ var cap_maxi int
+ for k := range pq.Queues {
+ if len_maxi < len(pq.Queues[k]) {
+ len_maxi = len(pq.Queues[k])
+ }
+
+ if cap_maxi < cap(pq.Queues[k]) {
+ cap_maxi = cap(pq.Queues[k])
+ }
+ }
+
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
+ case <-p.quit:
+ return
+ }
+ }
+ }(p.pq)
+
go func() {
<-p.quit
cancel()
@@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
+func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
@@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
defer sp.Finish()
msg := &ChunkDeliveryMsg{
- Addr: chunk.Addr,
- SData: chunk.SData,
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
}
return p.SendPriority(ctx, msg, priority)
}
@@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
- defer cancel()
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
}
- return p.pq.Push(cctx, wmsg, int(priority))
+ err := p.pq.Push(wmsg, int(priority))
+ if err == pq.ErrContention {
+ log.Warn("dropping peer on priority queue contention", "peer", p.ID())
+ p.Drop(err)
+ }
+ return err
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 4ff947b21..19eaad34e 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
+
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
@@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
+
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 0,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucketKeyFileStore = simulation.BucketKey("filestore")
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 313019d6a..7cd09099c 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"os"
+ "runtime"
"sync"
"testing"
"time"
@@ -39,15 +40,20 @@ import (
mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
-const testMinProxBinSize = 2
const MaxTimeout = 600
type synctestConfig struct {
- addrs [][]byte
- hashes []storage.Address
- idToChunksMap map[discover.NodeID][]int
- chunksToNodesMap map[string][]int
- addrToIDMap map[string]discover.NodeID
+ addrs [][]byte
+ hashes []storage.Address
+ idToChunksMap map[discover.NodeID][]int
+ //chunksToNodesMap map[string][]int
+ addrToIDMap map[string]discover.NodeID
+}
+
+// Tests in this file should not request chunks from peers.
+// This function will panic indicating that there is a problem if request has been made.
+func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
+ panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
}
//This test is a syncing test for nodes.
@@ -58,6 +64,9 @@ type synctestConfig struct {
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
func TestSyncingViaGlobalSync(t *testing.T) {
+ if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
+ t.Skip("Flaky on mac on travis")
+ }
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
@@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}
func TestSyncingViaDirectSubscribe(t *testing.T) {
+ if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
+ t.Skip("Flaky on mac on travis")
+ }
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
- err := testSyncingViaDirectSubscribe(*chunks, *nodes)
+ err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
@@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) {
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
- err := testSyncingViaDirectSubscribe(chnk, n)
+ err := testSyncingViaDirectSubscribe(t, chnk, n)
if err != nil {
t.Fatal(err)
}
@@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
@@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
t.Fatal(err)
}
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal("unexpected disconnect")
+ cancelSimRun()
+ }
+ }()
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
conf.hashes = append(conf.hashes, hashes...)
mapKeysToNodes(conf)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- return err
- }
-
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
@@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
}()
}
for !allSuccess {
+ allSuccess = true
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
@@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
- allSuccess = localSuccess
+ if !localSuccess {
+ allSuccess = false
+ break
+ }
}
}
if !allSuccess {
@@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
if result.Error != nil {
t.Fatal(result.Error)
}
+ log.Info("Simulation ended")
}
/*
@@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
*/
-func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
+func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
@@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
})
defer sim.Close()
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun()
conf := &synctestConfig{}
@@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return err
}
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
+ }
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal("unexpected disconnect")
+ cancelSimRun()
+ }
+ }()
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
// or until the timeout is reached.
allSuccess := false
for !allSuccess {
+ allSuccess = true
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
@@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
- allSuccess = localSuccess
+ if !localSuccess {
+ allSuccess = false
+ break
+ }
}
}
if !allSuccess {
@@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return result.Error
}
- log.Info("Simulation terminated")
+ log.Info("Simulation ended")
return nil
}
@@ -462,10 +527,9 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
//iterate over each bin and solicit needed subscription to bins
kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool {
//identify begin and start index of the bin(s) we want to subscribe to
- histRange := &Range{}
subCnt++
- err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top)
+ err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
if err != nil {
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
return false
@@ -478,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
//map chunk keys to addresses which are responsible
func mapKeysToNodes(conf *synctestConfig) {
- kmap := make(map[string][]int)
nodemap := make(map[string][]int)
//build a pot for chunk hashes
np := pot.NewPot(nil, 0)
@@ -487,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) {
indexmap[string(a)] = i
np, _, _ = pot.Add(np, a, pof)
}
+
+ var kadMinProxSize = 2
+
+ ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs)
+
//for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
for i := 0; i < len(conf.hashes); i++ {
- pl := 256 //highest possible proximity
- var nns []int
+ var a []byte
np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
- a := val.([]byte)
- if pl < 256 && pl != po {
- return false
- }
- if pl == 256 || pl == po {
- log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)]))
- nns = append(nns, indexmap[string(a)])
- nodemap[string(a)] = append(nodemap[string(a)], i)
- }
- if pl == 256 && len(nns) >= testMinProxBinSize {
- //maxProxBinSize has been reached at this po, so save it
- //we will add all other nodes at the same po
- pl = po
- }
- return true
+ // take the first address
+ a = val.([]byte)
+ return false
})
- kmap[string(conf.hashes[i])] = nns
+
+ nns := ppmap[common.Bytes2Hex(a)].NNSet
+ nns = append(nns, a)
+
+ for _, p := range nns {
+ nodemap[string(p)] = append(nodemap[string(p)], i)
+ }
}
for addr, chunks := range nodemap {
//this selects which chunks are expected to be found with the given node
conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
}
log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
- conf.chunksToNodesMap = kmap
}
//upload a file(chunks) to a single local node store
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index deffdfc3f..1f1f34b7b 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -32,10 +32,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -43,8 +41,8 @@ const (
Mid
High
Top
- PriorityQueue // number of queues
- PriorityQueueCap = 32 // queue capacity
+ PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
+ PriorityQueueCap = 128 // queue capacity
HashSize = 32
)
@@ -73,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
- return NewSwarmChunkServer(delivery.db), nil
+ return NewSwarmChunkServer(delivery.chunkStore), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
- RegisterSwarmSyncerServer(streamer, db)
- RegisterSwarmSyncerClient(streamer, db)
+ RegisterSwarmSyncerServer(streamer, syncChunkStore)
+ RegisterSwarmSyncerClient(streamer, syncChunkStore)
if options.DoSync {
// latestIntC function ensures that
@@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
return peer.Send(context.TODO(), msg)
}
-func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "registry.retrieve")
- defer sp.Finish()
-
- return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
-}
-
func (r *Registry) NodeInfo() interface{} {
return nil
}
@@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData(context.Context, []byte) func()
+ NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 7523860c9..06e96b9a9 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -80,15 +80,17 @@ func newTestClient(t string) *testClient {
}
}
-func (self *testClient) NeedData(ctx context.Context, hash []byte) func() {
+func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
- return func() {
+ return func(context.Context) error {
<-self.wait0
+ return nil
}
} else if bytes.Equal(hash, hash2[:]) {
- return func() {
+ return func(context.Context) error {
<-self.wait2
+ return nil
}
}
return nil
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index d7febe4a3..e9811a678 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -28,7 +28,6 @@ import (
)
const (
- // BatchSize = 2
BatchSize = 128
)
@@ -38,35 +37,37 @@ const (
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
- db *storage.DBAPI
+ store storage.SyncChunkStore
sessionAt uint64
start uint64
+ live bool
quit chan struct{}
}
// NewSwarmSyncerServer is contructor for SwarmSyncerServer
-func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) {
- sessionAt := db.CurrentBucketStorageIndex(po)
+func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
+ sessionAt := syncChunkStore.BinIndex(po)
var start uint64
if live {
start = sessionAt
}
return &SwarmSyncerServer{
po: po,
- db: db,
+ store: syncChunkStore,
sessionAt: sessionAt,
start: start,
+ live: live,
quit: make(chan struct{}),
}, nil
}
-func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(live, po, db)
+ return NewSwarmSyncerServer(live, po, syncChunkStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() {
close(s.quit)
}
-// GetSection retrieves the actual chunk from localstore
+// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.store.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
- if from == 0 {
- from = s.start
- }
- if to <= from || from >= s.sessionAt {
- to = math.MaxUint64
+ if s.live {
+ if from == 0 {
+ from = s.start
+ }
+ if to <= from || from >= s.sessionAt {
+ to = math.MaxUint64
+ }
+ } else {
+ if (to < from && to != 0) || from > s.sessionAt {
+ return nil, 0, 0, nil, nil
+ }
+ if to == 0 || to > s.sessionAt {
+ to = s.sessionAt
+ }
}
+
var ticker *time.Ticker
defer func() {
if ticker != nil {
@@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
}
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
- err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool {
- batch = append(batch, addr[:]...)
+ err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
+ batch = append(batch, key[:]...)
i++
to = idx
return i < BatchSize
@@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
wait = true
}
- log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po))
+ log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
return batch, from, to, nil, nil
}
@@ -146,28 +155,26 @@ type SwarmSyncerClient struct {
sessionReader storage.LazySectionReader
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
- db *storage.DBAPI
+ store storage.SyncChunkStore
// chunker storage.Chunker
- currentRoot storage.Address
- requestFunc func(chunk *storage.Chunk)
- end, start uint64
- peer *Peer
- ignoreExistingRequest bool
- stream Stream
+ currentRoot storage.Address
+ requestFunc func(chunk *storage.Chunk)
+ end, start uint64
+ peer *Peer
+ stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
-func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) {
+func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
- db: db,
- peer: p,
- ignoreExistingRequest: ignoreExistingRequest,
- stream: stream,
+ store: store,
+ peer: p,
+ stream: stream,
}, nil
}
// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
-// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
+// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// retrieveC := make(storage.Chunk, chunksCap)
// RunChunkRequestor(p, retrieveC)
// storeC := make(storage.Chunk, chunksCap)
@@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
-func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live))
+ return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
}
// NeedData
-func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) {
- chunk, _ := s.db.GetOrCreateRequest(ctx, key)
- // TODO: we may want to request from this peer anyway even if the request exists
-
- // ignoreExistingRequest is temporary commented out until its functionality is verified.
- // For now, this optimization can be disabled.
- if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) {
- return nil
- }
- // create request and wait until the chunk data arrives and is stored
- return func() {
- chunk.WaitToStore()
- }
+func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
+ return s.store.FetchFunc(ctx, key)
}
// BatchDone
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index f72aa3444..469d520f8 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
- bucket.Store(bucketKeyDB, db)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
bucket.Store(bucketKeyDelivery, delivery)
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
@@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
- db := item.(*storage.DBAPI)
- db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
+ netStore := item.(*storage.NetStore)
+ netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
@@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
- db := item.(*storage.DBAPI)
- chunk, err := db.Get(ctx, key)
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
- continue
+ db := item.(*storage.NetStore)
+ _, err := db.Get(ctx, key)
+ if err == nil {
+ found++
}
- // needed for leveldb not to be closed?
- // chunk.WaitToStore()
- found++
}
}
log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)