diff options
Diffstat (limited to 'swarm/network/stream/peer.go')
-rw-r--r-- | swarm/network/stream/peer.go | 40 |
1 files changed, 33 insertions, 7 deletions
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 29984a911..80b9ab711 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -27,8 +27,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) var sendTimeout = 30 * time.Second @@ -62,6 +64,11 @@ type Peer struct { quit chan struct{} } +type WrappedPriorityMsg struct { + Context context.Context + Msg interface{} +} + // NewPeer is the constructor for Peer func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { p := &Peer{ @@ -74,7 +81,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { quit: make(chan struct{}), } ctx, cancel := context.WithCancel(context.Background()) - go p.pq.Run(ctx, func(i interface{}) { p.Send(i) }) + go p.pq.Run(ctx, func(i interface{}) { + wmsg := i.(WrappedPriorityMsg) + p.Send(wmsg.Context, wmsg.Msg) + }) go func() { <-p.quit cancel() @@ -83,25 +93,41 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "send.chunk.delivery") + defer sp.Finish() + msg := &ChunkDeliveryMsg{ Addr: chunk.Addr, SData: chunk.SData, } - return p.SendPriority(msg, priority) + return p.SendPriority(ctx, msg, priority) } // SendPriority sends message to the peer using the outgoing priority queue -func (p *Peer) SendPriority(msg interface{}, priority uint8) error { +func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) + cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) defer cancel() - return p.pq.Push(ctx, msg, int(priority)) + wmsg := WrappedPriorityMsg{ + Context: ctx, + Msg: msg, + } + return p.pq.Push(cctx, wmsg, int(priority)) } // SendOfferedHashes sends OfferedHashesMsg protocol msg func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { + var sp opentracing.Span + ctx, sp := spancontext.StartSpan( + context.TODO(), + "send.offered.hashes") + defer sp.Finish() + hashes, from, to, proof, err := s.SetNextBatch(f, t) if err != nil { return err @@ -124,7 +150,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) - return p.SendPriority(msg, s.priority) + return p.SendPriority(ctx, msg, s.priority) } func (p *Peer) getServer(s Stream) (*server, error) { |