aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go30
1 files changed, 29 insertions, 1 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 1eda06c6a..3861cfcf6 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -375,7 +375,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer sp.close()
if r.doRetrieve {
- err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top)
+ err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil {
return err
}
@@ -500,10 +500,38 @@ type server struct {
stream Stream
priority uint8
currentBatch []byte
+ sessionIndex uint64
+}
+
+// setNextBatch adjusts passed interval based on session index and whether
+// stream is live or history. It calls Server SetNextBatch with adjusted
+// interval and returns batch hashes and their interval.
+func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
+ if s.stream.Live {
+ if from == 0 {
+ from = s.sessionIndex
+ }
+ if to <= from || from >= s.sessionIndex {
+ to = math.MaxUint64
+ }
+ } else {
+ if (to < from && to != 0) || from > s.sessionIndex {
+ return nil, 0, 0, nil, nil
+ }
+ if to == 0 || to > s.sessionIndex {
+ to = s.sessionIndex
+ }
+ }
+ return s.SetNextBatch(from, to)
}
// Server interface for outgoing peer Streamer
type Server interface {
+ // SessionIndex is called when a server is initialized
+ // to get the current cursor state of the stream data.
+ // Based on this index, live and history stream intervals
+ // will be adjusted before calling SetNextBatch.
+ SessionIndex() (uint64, error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
GetData(context.Context, []byte) ([]byte, error)
Close()