aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/stream.go10
-rw-r--r--swarm/network/stream/syncer.go5
2 files changed, 15 insertions, 0 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 8e2a5f31a..3bc450455 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -597,6 +597,16 @@ func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// HandleMsg is the message handler that delegates incoming messages
func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
+ select {
+ case <-p.streamer.quit:
+ log.Trace("message received after the streamer is closed", "peer", p.ID())
+ // return without an error since streamer is closed and
+ // no messages should be handled as other subcomponents like
+ // storage leveldb may be closed
+ return nil
+ default:
+ }
+
switch msg := msg.(type) {
case *SubscribeMsg:
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index 4fb8b9342..5f03dcff7 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -107,6 +107,11 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
+ select {
+ case <-s.quit:
+ return false
+ default:
+ }
batch = append(batch, key[:]...)
i++
to = idx