diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 30 |
1 files changed, 29 insertions, 1 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 1eda06c6a..3861cfcf6 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -375,7 +375,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer sp.close() if r.doRetrieve { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top) + err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err } @@ -500,10 +500,38 @@ type server struct { stream Stream priority uint8 currentBatch []byte + sessionIndex uint64 +} + +// setNextBatch adjusts passed interval based on session index and whether +// stream is live or history. It calls Server SetNextBatch with adjusted +// interval and returns batch hashes and their interval. +func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + if s.stream.Live { + if from == 0 { + from = s.sessionIndex + } + if to <= from || from >= s.sessionIndex { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionIndex { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionIndex { + to = s.sessionIndex + } + } + return s.SetNextBatch(from, to) } // Server interface for outgoing peer Streamer type Server interface { + // SessionIndex is called when a server is initialized + // to get the current cursor state of the stream data. + // Based on this index, live and history stream intervals + // will be adjusted before calling SetNextBatch. + SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() |