aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/delivery.go7
-rw-r--r--swarm/network/stream/delivery_test.go6
-rw-r--r--swarm/network/stream/intervals_test.go16
-rw-r--r--swarm/network/stream/peer.go13
-rw-r--r--swarm/network/stream/stream.go30
-rw-r--r--swarm/network/stream/streamer_test.go42
-rw-r--r--swarm/network/stream/syncer.go52
7 files changed, 89 insertions, 77 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index c2adb1009..0429c4dff 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -96,6 +96,11 @@ func (s *SwarmChunkServer) processDeliveries() {
}
}
+// SessionIndex returns zero in all cases for SwarmChunkServer.
+func (s *SwarmChunkServer) SessionIndex() (uint64, error) {
+ return 0, nil
+}
+
// SetNextBatch
func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) {
select {
@@ -141,7 +146,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
"retrieve.request")
defer osp.Finish()
- s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false))
+ s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true))
if err != nil {
return err
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index b021b8771..c6ebae3f0 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -88,7 +88,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
peer := streamer.getPeer(node.ID())
peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
- Stream: NewStream(swarmChunkServerStreamName, "", false),
+ Stream: NewStream(swarmChunkServerStreamName, "", true),
History: nil,
Priority: Top,
})
@@ -136,7 +136,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
node := tester.Nodes[0]
peer := streamer.getPeer(node.ID())
- stream := NewStream(swarmChunkServerStreamName, "", false)
+ stream := NewStream(swarmChunkServerStreamName, "", true)
peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
Stream: stream,
@@ -409,7 +409,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return fmt.Errorf("No registry")
}
registry := item.(*Registry)
- err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top)
+ err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), NewRange(0, 0), Top)
if err != nil {
return err
}
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 269259423..3164193b3 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -345,8 +345,6 @@ func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*
func (c *testExternalClient) Close() {}
-const testExternalServerBatchSize = 10
-
type testExternalServer struct {
t string
keyFunc func(key []byte, index uint64)
@@ -366,17 +364,11 @@ func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key
}
}
+func (s *testExternalServer) SessionIndex() (uint64, error) {
+ return s.sessionAt, nil
+}
+
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
- if from == 0 && to == 0 {
- from = s.sessionAt
- to = s.sessionAt + testExternalServerBatchSize
- }
- if to-from > testExternalServerBatchSize {
- to = from + testExternalServerBatchSize - 1
- }
- if from >= s.maxKeys && to > s.maxKeys {
- return nil, 0, 0, nil, io.EOF
- }
if to > s.maxKeys {
to = s.maxKeys
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index ef6bbdf70..89d135ad5 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -166,7 +166,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
"send.offered.hashes")
defer sp.Finish()
- hashes, from, to, proof, err := s.SetNextBatch(f, t)
+ hashes, from, to, proof, err := s.setNextBatch(f, t)
if err != nil {
return err
}
@@ -214,10 +214,15 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
return nil, ErrMaxPeerServers
}
+ sessionIndex, err := o.SessionIndex()
+ if err != nil {
+ return nil, err
+ }
os := &server{
- Server: o,
- stream: s,
- priority: priority,
+ Server: o,
+ stream: s,
+ priority: priority,
+ sessionIndex: sessionIndex,
}
p.servers[s] = os
return os, nil
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 1eda06c6a..3861cfcf6 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -375,7 +375,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer sp.close()
if r.doRetrieve {
- err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top)
+ err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil {
return err
}
@@ -500,10 +500,38 @@ type server struct {
stream Stream
priority uint8
currentBatch []byte
+ sessionIndex uint64
+}
+
+// setNextBatch adjusts passed interval based on session index and whether
+// stream is live or history. It calls Server SetNextBatch with adjusted
+// interval and returns batch hashes and their interval.
+func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
+ if s.stream.Live {
+ if from == 0 {
+ from = s.sessionIndex
+ }
+ if to <= from || from >= s.sessionIndex {
+ to = math.MaxUint64
+ }
+ } else {
+ if (to < from && to != 0) || from > s.sessionIndex {
+ return nil, 0, 0, nil, nil
+ }
+ if to == 0 || to > s.sessionIndex {
+ to = s.sessionIndex
+ }
+ }
+ return s.SetNextBatch(from, to)
}
// Server interface for outgoing peer Streamer
type Server interface {
+ // SessionIndex is called when a server is initialized
+ // to get the current cursor state of the stream data.
+ // Based on this index, live and history stream intervals
+ // will be adjusted before calling SetNextBatch.
+ SessionIndex() (uint64, error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
GetData(context.Context, []byte) ([]byte, error)
Close()
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 5d91eecfd..e7f79e7a1 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -107,15 +107,21 @@ func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*Takeo
func (self *testClient) Close() {}
type testServer struct {
- t string
+ t string
+ sessionIndex uint64
}
-func newTestServer(t string) *testServer {
+func newTestServer(t string, sessionIndex uint64) *testServer {
return &testServer{
- t: t,
+ t: t,
+ sessionIndex: sessionIndex,
}
}
+func (s *testServer) SessionIndex() (uint64, error) {
+ return s.sessionIndex, nil
+}
+
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
return make([]byte, HashSize), from + 1, to + 1, nil, nil
}
@@ -230,7 +236,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
stream := NewStream("foo", "", false)
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t), nil
+ return newTestServer(t, 10), nil
})
node := tester.Nodes[0]
@@ -297,7 +303,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
stream := NewStream("foo", "", true)
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t), nil
+ return newTestServer(t, 0), nil
})
node := tester.Nodes[0]
@@ -324,7 +330,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
},
Hashes: make([]byte, HashSize),
From: 1,
- To: 1,
+ To: 0,
},
Peer: node.ID(),
},
@@ -361,7 +367,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t), nil
+ return newTestServer(t, 0), nil
})
stream := NewStream("bar", "", true)
@@ -407,9 +413,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
stream := NewStream("foo", "", true)
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return &testServer{
- t: t,
- }, nil
+ return newTestServer(t, 10), nil
})
node := tester.Nodes[0]
@@ -448,8 +452,8 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
- From: 1,
- To: 1,
+ From: 11,
+ To: 0,
Hashes: make([]byte, HashSize),
},
Peer: node.ID(),
@@ -634,7 +638,7 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t), nil
+ return newTestServer(t, 10), nil
})
node := tester.Nodes[0]
@@ -694,8 +698,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
- From: 1,
- To: 1,
+ From: 11,
+ To: 0,
Hashes: make([]byte, HashSize),
},
Peer: node.ID(),
@@ -769,7 +773,7 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t), nil
+ return newTestServer(t, 0), nil
})
node := tester.Nodes[0]
@@ -799,7 +803,7 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
},
Hashes: make([]byte, HashSize),
From: 1,
- To: 1,
+ To: 0,
},
Peer: node.ID(),
},
@@ -843,7 +847,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
- return newTestServer(t), nil
+ return newTestServer(t, 0), nil
})
node := tester.Nodes[0]
@@ -903,7 +907,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
},
Hashes: make([]byte, HashSize),
From: 1,
- To: 1,
+ To: 0,
},
Peer: node.ID(),
},
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index 38b3078d2..4bfbac8b0 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -18,7 +18,6 @@ package stream
import (
"context"
- "math"
"strconv"
"time"
@@ -36,38 +35,27 @@ const (
// * live request delivery with or without checkback
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
- po uint8
- store storage.SyncChunkStore
- sessionAt uint64
- start uint64
- live bool
- quit chan struct{}
+ po uint8
+ store storage.SyncChunkStore
+ quit chan struct{}
}
-// NewSwarmSyncerServer is contructor for SwarmSyncerServer
-func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
- sessionAt := syncChunkStore.BinIndex(po)
- var start uint64
- if live {
- start = sessionAt
- }
+// NewSwarmSyncerServer is constructor for SwarmSyncerServer
+func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
return &SwarmSyncerServer{
- po: po,
- store: syncChunkStore,
- sessionAt: sessionAt,
- start: start,
- live: live,
- quit: make(chan struct{}),
+ po: po,
+ store: syncChunkStore,
+ quit: make(chan struct{}),
}, nil
}
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
- streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
+ streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(live, po, syncChunkStore)
+ return NewSwarmSyncerServer(po, syncChunkStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -88,25 +76,15 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er
return chunk.Data(), nil
}
+// SessionIndex returns current storage bin (po) index.
+func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
+ return s.store.BinIndex(s.po), nil
+}
+
// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
- if s.live {
- if from == 0 {
- from = s.start
- }
- if to <= from || from >= s.sessionAt {
- to = math.MaxUint64
- }
- } else {
- if (to < from && to != 0) || from > s.sessionAt {
- return nil, 0, 0, nil, nil
- }
- if to == 0 || to > s.sessionAt {
- to = s.sessionAt
- }
- }
var ticker *time.Ticker
defer func() {