diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 42 |
1 files changed, 25 insertions, 17 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 9b4658c51..56f242e91 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,8 +32,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -235,7 +237,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang if e, ok := err.(*notFoundError); ok && e.t == "server" { // request subscription only if the server for this stream is not created log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h) - return peer.Send(&RequestSubscriptionMsg{ + return peer.Send(context.TODO(), &RequestSubscriptionMsg{ Stream: s, History: h, Priority: prio, @@ -285,7 +287,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(msg, priority) + return peer.SendPriority(context.TODO(), msg, priority) } func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { @@ -299,7 +301,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { } log.Debug("Unsubscribe ", "peer", peerId, "stream", s) - if err := peer.Send(msg); err != nil { + if err := peer.Send(context.TODO(), msg); err != nil { return err } return peer.removeClient(s) @@ -320,11 +322,17 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { } log.Debug("Quit ", "peer", peerId, "stream", s) - return peer.Send(msg) + return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(chunk *storage.Chunk) error { - return r.delivery.RequestFromPeers(chunk.Addr[:], r.skipCheck) +func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "registry.retrieve") + defer sp.Finish() + + return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) } func (r *Registry) NodeInfo() interface{} { @@ -460,11 +468,11 @@ func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { } // HandleMsg is the message handler that delegates incoming messages -func (p *Peer) HandleMsg(msg interface{}) error { +func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *SubscribeMsg: - return p.handleSubscribeMsg(msg) + return p.handleSubscribeMsg(ctx, msg) case *SubscribeErrorMsg: return p.handleSubscribeErrorMsg(msg) @@ -473,22 +481,22 @@ func (p *Peer) HandleMsg(msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(msg) + return p.handleOfferedHashesMsg(ctx, msg) case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(msg) + return p.handleTakeoverProofMsg(ctx, msg) case *WantedHashesMsg: - return p.handleWantedHashesMsg(msg) + return p.handleWantedHashesMsg(ctx, msg) case *ChunkDeliveryMsg: - return p.streamer.delivery.handleChunkDeliveryMsg(p, msg) + return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(p, msg) + return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) case *RequestSubscriptionMsg: - return p.handleRequestSubscription(msg) + return p.handleRequestSubscription(ctx, msg) case *QuitMsg: return p.handleQuitMsg(msg) @@ -508,7 +516,7 @@ type server struct { // Server interface for outgoing peer Streamer type Server interface { SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) - GetData([]byte) ([]byte, error) + GetData(context.Context, []byte) ([]byte, error) Close() } @@ -551,7 +559,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData([]byte) func() + NeedData(context.Context, []byte) func() BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } @@ -588,7 +596,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error if err != nil { return err } - if err := p.SendPriority(tp, c.priority); err != nil { + if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { |