aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/delivery.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r--swarm/network/stream/delivery.go47
1 files changed, 34 insertions, 13 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 75aabad6c..fa210e300 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -17,6 +17,7 @@
package stream
import (
+ "context"
"errors"
"time"
@@ -25,7 +26,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -118,8 +121,8 @@ func (s *SwarmChunkServer) Close() {
}
// GetData retrives chunk data from db store
-func (s *SwarmChunkServer) GetData(key []byte) ([]byte, error) {
- chunk, err := s.db.Get(storage.Address(key))
+func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
+ chunk, err := s.db.Get(ctx, storage.Address(key))
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
@@ -134,25 +137,37 @@ type RetrieveRequestMsg struct {
SkipCheck bool
}
-func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) error {
+func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
log.Trace("received request", "peer", sp.ID(), "hash", req.Addr)
handleRetrieveRequestMsgCount.Inc(1)
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "retrieve.request")
+ defer osp.Finish()
+
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false))
if err != nil {
return err
}
streamer := s.Server.(*SwarmChunkServer)
- chunk, created := d.db.GetOrCreateRequest(req.Addr)
+ chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
if chunk.ReqC != nil {
if created {
- if err := d.RequestFromPeers(chunk.Addr[:], true, sp.ID()); err != nil {
+ if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
chunk.SetErrored(storage.ErrChunkForward)
return nil
}
}
go func() {
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "waiting.delivery")
+ defer osp.Finish()
+
t := time.NewTimer(10 * time.Minute)
defer t.Stop()
@@ -169,7 +184,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e
chunk.SetErrored(nil)
if req.SkipCheck {
- err := sp.Deliver(chunk, s.priority)
+ err := sp.Deliver(ctx, chunk, s.priority)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
sp.Drop(err)
@@ -185,7 +200,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e
if length := len(chunk.SData); length < 9 {
log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
}
- return sp.Deliver(chunk, s.priority)
+ return sp.Deliver(ctx, chunk, s.priority)
}
streamer.deliveryC <- chunk.Addr[:]
return nil
@@ -197,7 +212,13 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
-func (d *Delivery) handleChunkDeliveryMsg(sp *Peer, req *ChunkDeliveryMsg) error {
+func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "chunk.delivery")
+ defer osp.Finish()
+
req.peer = sp
d.receiveC <- req
return nil
@@ -209,7 +230,7 @@ R:
processReceivedChunksCount.Inc(1)
// this should be has locally
- chunk, err := d.db.Get(req.Addr)
+ chunk, err := d.db.Get(context.TODO(), req.Addr)
if err == nil {
continue R
}
@@ -224,7 +245,7 @@ R:
default:
}
chunk.SData = req.SData
- d.db.Put(chunk)
+ d.db.Put(context.TODO(), chunk)
go func(req *ChunkDeliveryMsg) {
err := chunk.WaitToStore()
@@ -236,10 +257,11 @@ R:
}
// RequestFromPeers sends a chunk retrieve request to
-func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
+func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
var success bool
var err error
requestFromPeersCount.Inc(1)
+
d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool {
spId := p.(network.Peer).ID()
for _, p := range peersToSkip {
@@ -253,8 +275,7 @@ func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...
log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
return true
}
- // TODO: skip light nodes that do not accept retrieve requests
- err = sp.SendPriority(&RetrieveRequestMsg{
+ err = sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: hash,
SkipCheck: skipCheck,
}, Top)