diff options
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r-- | swarm/network/stream/delivery.go | 47 |
1 files changed, 34 insertions, 13 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 75aabad6c..fa210e300 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -17,6 +17,7 @@ package stream import ( + "context" "errors" "time" @@ -25,7 +26,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -118,8 +121,8 @@ func (s *SwarmChunkServer) Close() { } // GetData retrives chunk data from db store -func (s *SwarmChunkServer) GetData(key []byte) ([]byte, error) { - chunk, err := s.db.Get(storage.Address(key)) +func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { + chunk, err := s.db.Get(ctx, storage.Address(key)) if err == storage.ErrFetching { <-chunk.ReqC } else if err != nil { @@ -134,25 +137,37 @@ type RetrieveRequestMsg struct { SkipCheck bool } -func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) error { +func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { log.Trace("received request", "peer", sp.ID(), "hash", req.Addr) handleRetrieveRequestMsgCount.Inc(1) + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "retrieve.request") + defer osp.Finish() + s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false)) if err != nil { return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(req.Addr) + chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) if chunk.ReqC != nil { if created { - if err := d.RequestFromPeers(chunk.Addr[:], true, sp.ID()); err != nil { + if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) chunk.SetErrored(storage.ErrChunkForward) return nil } } go func() { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "waiting.delivery") + defer osp.Finish() + t := time.NewTimer(10 * time.Minute) defer t.Stop() @@ -169,7 +184,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e chunk.SetErrored(nil) if req.SkipCheck { - err := sp.Deliver(chunk, s.priority) + err := sp.Deliver(ctx, chunk, s.priority) if err != nil { log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) sp.Drop(err) @@ -185,7 +200,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e if length := len(chunk.SData); length < 9 { log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) } - return sp.Deliver(chunk, s.priority) + return sp.Deliver(ctx, chunk, s.priority) } streamer.deliveryC <- chunk.Addr[:] return nil @@ -197,7 +212,13 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } -func (d *Delivery) handleChunkDeliveryMsg(sp *Peer, req *ChunkDeliveryMsg) error { +func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( + ctx, + "chunk.delivery") + defer osp.Finish() + req.peer = sp d.receiveC <- req return nil @@ -209,7 +230,7 @@ R: processReceivedChunksCount.Inc(1) // this should be has locally - chunk, err := d.db.Get(req.Addr) + chunk, err := d.db.Get(context.TODO(), req.Addr) if err == nil { continue R } @@ -224,7 +245,7 @@ R: default: } chunk.SData = req.SData - d.db.Put(chunk) + d.db.Put(context.TODO(), chunk) go func(req *ChunkDeliveryMsg) { err := chunk.WaitToStore() @@ -236,10 +257,11 @@ R: } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { +func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { var success bool var err error requestFromPeersCount.Inc(1) + d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool { spId := p.(network.Peer).ID() for _, p := range peersToSkip { @@ -253,8 +275,7 @@ func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ... log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) return true } - // TODO: skip light nodes that do not accept retrieve requests - err = sp.SendPriority(&RetrieveRequestMsg{ + err = sp.SendPriority(ctx, &RetrieveRequestMsg{ Addr: hash, SkipCheck: skipCheck, }, Top) |