aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r--swarm/network/stream/peer.go40
1 files changed, 33 insertions, 7 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 29984a911..80b9ab711 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -27,8 +27,10 @@ import (
"github.com/ethereum/go-ethereum/swarm/log"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
var sendTimeout = 30 * time.Second
@@ -62,6 +64,11 @@ type Peer struct {
quit chan struct{}
}
+type WrappedPriorityMsg struct {
+ Context context.Context
+ Msg interface{}
+}
+
// NewPeer is the constructor for Peer
func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
p := &Peer{
@@ -74,7 +81,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
quit: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
- go p.pq.Run(ctx, func(i interface{}) { p.Send(i) })
+ go p.pq.Run(ctx, func(i interface{}) {
+ wmsg := i.(WrappedPriorityMsg)
+ p.Send(wmsg.Context, wmsg.Msg)
+ })
go func() {
<-p.quit
cancel()
@@ -83,25 +93,41 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error {
+func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ "send.chunk.delivery")
+ defer sp.Finish()
+
msg := &ChunkDeliveryMsg{
Addr: chunk.Addr,
SData: chunk.SData,
}
- return p.SendPriority(msg, priority)
+ return p.SendPriority(ctx, msg, priority)
}
// SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(msg interface{}, priority uint8) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
+ cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()
- return p.pq.Push(ctx, msg, int(priority))
+ wmsg := WrappedPriorityMsg{
+ Context: ctx,
+ Msg: msg,
+ }
+ return p.pq.Push(cctx, wmsg, int(priority))
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
+ var sp opentracing.Span
+ ctx, sp := spancontext.StartSpan(
+ context.TODO(),
+ "send.offered.hashes")
+ defer sp.Finish()
+
hashes, from, to, proof, err := s.SetNextBatch(f, t)
if err != nil {
return err
@@ -124,7 +150,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
- return p.SendPriority(msg, s.priority)
+ return p.SendPriority(ctx, msg, s.priority)
}
func (p *Peer) getServer(s Stream) (*server, error) {