aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r--swarm/network/stream/peer.go51
1 files changed, 42 insertions, 9 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 80b9ab711..1466a7a9c 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -33,8 +33,6 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
-var sendTimeout = 30 * time.Second
-
type notFoundError struct {
t string
s Stream
@@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
- p.Send(wmsg.Context, wmsg.Msg)
+ err := p.Send(wmsg.Context, wmsg.Msg)
+ if err != nil {
+ log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ }
})
+
+ // basic monitoring for pq contention
+ go func(pq *pq.PriorityQueue) {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ var len_maxi int
+ var cap_maxi int
+ for k := range pq.Queues {
+ if len_maxi < len(pq.Queues[k]) {
+ len_maxi = len(pq.Queues[k])
+ }
+
+ if cap_maxi < cap(pq.Queues[k]) {
+ cap_maxi = cap(pq.Queues[k])
+ }
+ }
+
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
+ case <-p.quit:
+ return
+ }
+ }
+ }(p.pq)
+
go func() {
<-p.quit
cancel()
@@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
+func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
@@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
defer sp.Finish()
msg := &ChunkDeliveryMsg{
- Addr: chunk.Addr,
- SData: chunk.SData,
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
}
return p.SendPriority(ctx, msg, priority)
}
@@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
- defer cancel()
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
}
- return p.pq.Push(cctx, wmsg, int(priority))
+ err := p.pq.Push(wmsg, int(priority))
+ if err == pq.ErrContention {
+ log.Warn("dropping peer on priority queue contention", "peer", p.ID())
+ p.Drop(err)
+ }
+ return err
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg