aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2018-07-13 23:40:28 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-13 23:40:28 +0800
commit7c9314f231a7ddffbbbc5fec16c65519a0121eeb (patch)
treedbc4021b66ee8968ad747036741fac7e1b972a39 /swarm/network/stream
parentf7d3678c28c4b92e45a458e4785bd0f1cdc20e34 (diff)
downloaddexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.gz
dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.tar.zst
dexon-7c9314f231a7ddffbbbc5fec16c65519a0121eeb.zip
swarm: integrate OpenTracing; propagate ctx to internal APIs (#17169)
* swarm: propagate ctx, enable opentracing * swarm/tracing: log error when tracing is misconfigured
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/common_test.go14
-rw-r--r--swarm/network/stream/delivery.go47
-rw-r--r--swarm/network/stream/delivery_test.go24
-rw-r--r--swarm/network/stream/messages.go29
-rw-r--r--swarm/network/stream/peer.go40
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go10
-rw-r--r--swarm/network/stream/snapshot_sync_test.go2
-rw-r--r--swarm/network/stream/stream.go42
-rw-r--r--swarm/network/stream/streamer_test.go5
-rw-r--r--swarm/network/stream/syncer.go9
-rw-r--r--swarm/network/stream/syncer_test.go2
11 files changed, 145 insertions, 79 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 6a2c27401..4d55c6ee3 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -126,7 +126,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) {
return testRegistry, nil
}
-func defaultRetrieveFunc(id discover.NodeID) func(chunk *storage.Chunk) error {
+func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
return nil
}
@@ -217,14 +217,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
-func (rrs *roundRobinStore) Get(addr storage.Address) (*storage.Chunk, error) {
+func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
-func (rrs *roundRobinStore) Put(chunk *storage.Chunk) {
+func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
- rrs.stores[idx].Put(chunk)
+ rrs.stores[idx].Put(ctx, chunk)
}
func (rrs *roundRobinStore) Close() {
@@ -369,8 +369,8 @@ func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
}
}
-func (c *testExternalClient) NeedData(hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(hash)
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
+ chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
if chunk.ReqC == nil {
return nil
}
@@ -429,7 +429,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6
return b, from, to, nil, nil
}
-func (s *testExternalServer) GetData([]byte) ([]byte, error) {
+func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
return make([]byte, 4096), nil
}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 75aabad6c..fa210e300 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -17,6 +17,7 @@
package stream
import (
+ "context"
"errors"
"time"
@@ -25,7 +26,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -118,8 +121,8 @@ func (s *SwarmChunkServer) Close() {
}
// GetData retrives chunk data from db store
-func (s *SwarmChunkServer) GetData(key []byte) ([]byte, error) {
- chunk, err := s.db.Get(storage.Address(key))
+func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
+ chunk, err := s.db.Get(ctx, storage.Address(key))
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
@@ -134,25 +137,37 @@ type RetrieveRequestMsg struct {
SkipCheck bool
}
-func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) error {
+func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
log.Trace("received request", "peer", sp.ID(), "hash", req.Addr)
handleRetrieveRequestMsgCount.Inc(1)
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "retrieve.request")
+ defer osp.Finish()
+
s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", false))
if err != nil {
return err
}
streamer := s.Server.(*SwarmChunkServer)
- chunk, created := d.db.GetOrCreateRequest(req.Addr)
+ chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
if chunk.ReqC != nil {
if created {
- if err := d.RequestFromPeers(chunk.Addr[:], true, sp.ID()); err != nil {
+ if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
chunk.SetErrored(storage.ErrChunkForward)
return nil
}
}
go func() {
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "waiting.delivery")
+ defer osp.Finish()
+
t := time.NewTimer(10 * time.Minute)
defer t.Stop()
@@ -169,7 +184,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e
chunk.SetErrored(nil)
if req.SkipCheck {
- err := sp.Deliver(chunk, s.priority)
+ err := sp.Deliver(ctx, chunk, s.priority)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
sp.Drop(err)
@@ -185,7 +200,7 @@ func (d *Delivery) handleRetrieveRequestMsg(sp *Peer, req *RetrieveRequestMsg) e
if length := len(chunk.SData); length < 9 {
log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
}
- return sp.Deliver(chunk, s.priority)
+ return sp.Deliver(ctx, chunk, s.priority)
}
streamer.deliveryC <- chunk.Addr[:]
return nil
@@ -197,7 +212,13 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
-func (d *Delivery) handleChunkDeliveryMsg(sp *Peer, req *ChunkDeliveryMsg) error {
+func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
+ ctx,
+ "chunk.delivery")
+ defer osp.Finish()
+
req.peer = sp
d.receiveC <- req
return nil
@@ -209,7 +230,7 @@ R:
processReceivedChunksCount.Inc(1)
// this should be has locally
- chunk, err := d.db.Get(req.Addr)
+ chunk, err := d.db.Get(context.TODO(), req.Addr)
if err == nil {
continue R
}
@@ -224,7 +245,7 @@ R:
default:
}
chunk.SData = req.SData
- d.db.Put(chunk)
+ d.db.Put(context.TODO(), chunk)
go func(req *ChunkDeliveryMsg) {
err := chunk.WaitToStore()
@@ -236,10 +257,11 @@ R:
}
// RequestFromPeers sends a chunk retrieve request to
-func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
+func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
var success bool
var err error
requestFromPeersCount.Inc(1)
+
d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool {
spId := p.(network.Peer).ID()
for _, p := range peersToSkip {
@@ -253,8 +275,7 @@ func (d *Delivery) RequestFromPeers(hash []byte, skipCheck bool, peersToSkip ...
log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
return true
}
- // TODO: skip light nodes that do not accept retrieve requests
- err = sp.SendPriority(&RetrieveRequestMsg{
+ err = sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: hash,
SkipCheck: skipCheck,
}, Top)
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index cd87557b1..f3da893a2 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -46,7 +46,7 @@ func TestStreamerRetrieveRequest(t *testing.T) {
peerID := tester.IDs[0]
- streamer.delivery.RequestFromPeers(hash0[:], true)
+ streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -80,7 +80,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
peer := streamer.getPeer(peerID)
- peer.handleSubscribeMsg(&SubscribeMsg{
+ peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
Stream: NewStream(swarmChunkServerStreamName, "", false),
History: nil,
Priority: Top,
@@ -131,7 +131,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
stream := NewStream(swarmChunkServerStreamName, "", false)
- peer.handleSubscribeMsg(&SubscribeMsg{
+ peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
Stream: stream,
History: nil,
Priority: Top,
@@ -140,7 +140,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
hash := storage.Address(hash0[:])
chunk := storage.NewChunk(hash, nil)
chunk.SData = hash
- localStore.Put(chunk)
+ localStore.Put(context.TODO(), chunk)
chunk.WaitToStore()
err = tester.TestExchanges(p2ptest.Exchange{
@@ -179,7 +179,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
hash = storage.Address(hash1[:])
chunk = storage.NewChunk(hash, nil)
chunk.SData = hash1[:]
- localStore.Put(chunk)
+ localStore.Put(context.TODO(), chunk)
chunk.WaitToStore()
err = tester.TestExchanges(p2ptest.Exchange{
@@ -234,7 +234,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
chunkKey := hash0[:]
chunkData := hash1[:]
- chunk, created := localStore.GetOrCreateRequest(chunkKey)
+ chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey)
if !created {
t.Fatal("chunk already exists")
@@ -285,7 +285,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
case <-chunk.ReqC:
}
- storedChunk, err := localStore.Get(chunkKey)
+ storedChunk, err := localStore.Get(context.TODO(), chunkKey)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
@@ -401,8 +401,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}
// create a retriever FileStore for the pivot node
delivery := deliveries[sim.IDs[0]]
- retrieveFunc := func(chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
+ retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
+ return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
@@ -617,8 +617,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
// create a retriever FileStore for the pivot node
// by now deliveries are set for each node by the streamer service
delivery := deliveries[sim.IDs[0]]
- retrieveFunc := func(chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(chunk.Addr[:], skipCheck)
+ retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
+ return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc)
@@ -650,7 +650,7 @@ Loop:
errs := make(chan error)
for _, hash := range hashes {
go func(h storage.Address) {
- _, err := netStore.Get(h)
+ _, err := netStore.Get(ctx, h)
log.Warn("test check netstore get", "hash", h, "err", err)
errs <- err
}(hash)
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index 5668a73e9..a19f63589 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -17,6 +17,7 @@
package stream
import (
+ "context"
"errors"
"fmt"
"sync"
@@ -25,7 +26,9 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
bv "github.com/ethereum/go-ethereum/swarm/network/bitvector"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
// Stream defines a unique stream identifier.
@@ -71,17 +74,17 @@ type RequestSubscriptionMsg struct {
Priority uint8 // delivered on priority channel
}
-func (p *Peer) handleRequestSubscription(req *RequestSubscriptionMsg) (err error) {
+func (p *Peer) handleRequestSubscription(ctx context.Context, req *RequestSubscriptionMsg) (err error) {
log.Debug(fmt.Sprintf("handleRequestSubscription: streamer %s to subscribe to %s with stream %s", p.streamer.addr.ID(), p.ID(), req.Stream))
return p.streamer.Subscribe(p.ID(), req.Stream, req.History, req.Priority)
}
-func (p *Peer) handleSubscribeMsg(req *SubscribeMsg) (err error) {
+func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err error) {
metrics.GetOrRegisterCounter("peer.handlesubscribemsg", nil).Inc(1)
defer func() {
if err != nil {
- if e := p.Send(SubscribeErrorMsg{
+ if e := p.Send(context.TODO(), SubscribeErrorMsg{
Error: err.Error(),
}); e != nil {
log.Error("send stream subscribe error message", "err", err)
@@ -181,9 +184,15 @@ func (m OfferedHashesMsg) String() string {
// handleOfferedHashesMsg protocol msg handler calls the incoming streamer interface
// Filter method
-func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
+func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1)
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ "handle.offered.hashes")
+ defer sp.Finish()
+
c, _, err := p.getOrSetClient(req.Stream, req.From, req.To)
if err != nil {
return err
@@ -197,7 +206,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
- if wait := c.NeedData(hash); wait != nil {
+ if wait := c.NeedData(ctx, hash); wait != nil {
want.Set(i/HashSize, true)
wg.Add(1)
// create request and wait until the chunk data arrives and is stored
@@ -260,7 +269,7 @@ func (p *Peer) handleOfferedHashesMsg(req *OfferedHashesMsg) error {
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
- err := p.SendPriority(msg, c.priority)
+ err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority err, so dropping peer", "err", err)
p.Drop(err)
@@ -285,7 +294,7 @@ func (m WantedHashesMsg) String() string {
// handleWantedHashesMsg protocol msg handler
// * sends the next batch of unsynced keys
// * sends the actual data chunks as per WantedHashesMsg
-func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
+func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg", nil).Inc(1)
log.Trace("received wanted batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
@@ -314,7 +323,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
metrics.GetOrRegisterCounter("peer.handlewantedhashesmsg.actualget", nil).Inc(1)
hash := hashes[i*HashSize : (i+1)*HashSize]
- data, err := s.GetData(hash)
+ data, err := s.GetData(ctx, hash)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
@@ -323,7 +332,7 @@ func (p *Peer) handleWantedHashesMsg(req *WantedHashesMsg) error {
if length := len(chunk.SData); length < 9 {
log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
}
- if err := p.Deliver(chunk, s.priority); err != nil {
+ if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}
}
@@ -363,7 +372,7 @@ func (m TakeoverProofMsg) String() string {
return fmt.Sprintf("Stream: '%v' [%v-%v], Root: %x, Sig: %x", m.Stream, m.Start, m.End, m.Root, m.Sig)
}
-func (p *Peer) handleTakeoverProofMsg(req *TakeoverProofMsg) error {
+func (p *Peer) handleTakeoverProofMsg(ctx context.Context, req *TakeoverProofMsg) error {
_, err := p.getServer(req.Stream)
// store the strongest takeoverproof for the stream in streamer
return err
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 29984a911..80b9ab711 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -27,8 +27,10 @@ import (
"github.com/ethereum/go-ethereum/swarm/log"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
var sendTimeout = 30 * time.Second
@@ -62,6 +64,11 @@ type Peer struct {
quit chan struct{}
}
+type WrappedPriorityMsg struct {
+ Context context.Context
+ Msg interface{}
+}
+
// NewPeer is the constructor for Peer
func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
p := &Peer{
@@ -74,7 +81,10 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
quit: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
- go p.pq.Run(ctx, func(i interface{}) { p.Send(i) })
+ go p.pq.Run(ctx, func(i interface{}) {
+ wmsg := i.(WrappedPriorityMsg)
+ p.Send(wmsg.Context, wmsg.Msg)
+ })
go func() {
<-p.quit
cancel()
@@ -83,25 +93,41 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(chunk *storage.Chunk, priority uint8) error {
+func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ "send.chunk.delivery")
+ defer sp.Finish()
+
msg := &ChunkDeliveryMsg{
Addr: chunk.Addr,
SData: chunk.SData,
}
- return p.SendPriority(msg, priority)
+ return p.SendPriority(ctx, msg, priority)
}
// SendPriority sends message to the peer using the outgoing priority queue
-func (p *Peer) SendPriority(msg interface{}, priority uint8) error {
+func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
+ cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()
- return p.pq.Push(ctx, msg, int(priority))
+ wmsg := WrappedPriorityMsg{
+ Context: ctx,
+ Msg: msg,
+ }
+ return p.pq.Push(cctx, wmsg, int(priority))
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
+ var sp opentracing.Span
+ ctx, sp := spancontext.StartSpan(
+ context.TODO(),
+ "send.offered.hashes")
+ defer sp.Finish()
+
hashes, from, to, proof, err := s.SetNextBatch(f, t)
if err != nil {
return err
@@ -124,7 +150,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
- return p.SendPriority(msg, s.priority)
+ return p.SendPriority(ctx, msg, s.priority)
}
func (p *Peer) getServer(s Stream) (*server, error) {
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index da5253e8a..9961a0bc7 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -55,10 +55,10 @@ func initRetrievalTest() {
//deliveries for each node
deliveries = make(map[discover.NodeID]*Delivery)
//global retrieve func
- getRetrieveFunc = func(id discover.NodeID) func(chunk *storage.Chunk) error {
- return func(chunk *storage.Chunk) error {
+ getRetrieveFunc = func(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error {
+ return func(ctx context.Context, chunk *storage.Chunk) error {
skipCheck := true
- return deliveries[id].RequestFromPeers(chunk.Addr[:], skipCheck)
+ return deliveries[id].RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
}
//registries, map of discover.NodeID to its streamer
@@ -412,7 +412,7 @@ func runFileRetrievalTest(nodeCount int) error {
for i, hash := range conf.hashes {
reader, _ := fileStore.Retrieve(context.TODO(), hash)
//check that we can read the file size and that it corresponds to the generated file size
- if s, err := reader.Size(nil); err != nil || s != int64(len(randomFiles[i])) {
+ if s, err := reader.Size(context.TODO(), nil); err != nil || s != int64(len(randomFiles[i])) {
allSuccess = false
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
} else {
@@ -699,7 +699,7 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
for _, chnk := range conf.hashes {
reader, _ := fileStore.Retrieve(context.TODO(), chnk)
//assuming that reading the Size of the chunk is enough to know we found it
- if s, err := reader.Size(nil); err != nil || s != chunkSize {
+ if s, err := reader.Size(context.TODO(), nil); err != nil || s != chunkSize {
allSuccess = false
log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id)
} else {
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index fd8863d43..0b5257c60 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -437,7 +437,7 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error {
} else {
//use the actual localstore
lstore := stores[id]
- _, err = lstore.Get(chunk)
+ _, err = lstore.Get(context.TODO(), chunk)
}
if err != nil {
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 9b4658c51..56f242e91 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -32,8 +32,10 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
+ opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -235,7 +237,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang
if e, ok := err.(*notFoundError); ok && e.t == "server" {
// request subscription only if the server for this stream is not created
log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h)
- return peer.Send(&RequestSubscriptionMsg{
+ return peer.Send(context.TODO(), &RequestSubscriptionMsg{
Stream: s,
History: h,
Priority: prio,
@@ -285,7 +287,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit
}
log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
- return peer.SendPriority(msg, priority)
+ return peer.SendPriority(context.TODO(), msg, priority)
}
func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
@@ -299,7 +301,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
}
log.Debug("Unsubscribe ", "peer", peerId, "stream", s)
- if err := peer.Send(msg); err != nil {
+ if err := peer.Send(context.TODO(), msg); err != nil {
return err
}
return peer.removeClient(s)
@@ -320,11 +322,17 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
}
log.Debug("Quit ", "peer", peerId, "stream", s)
- return peer.Send(msg)
+ return peer.Send(context.TODO(), msg)
}
-func (r *Registry) Retrieve(chunk *storage.Chunk) error {
- return r.delivery.RequestFromPeers(chunk.Addr[:], r.skipCheck)
+func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
+ var sp opentracing.Span
+ ctx, sp = spancontext.StartSpan(
+ ctx,
+ "registry.retrieve")
+ defer sp.Finish()
+
+ return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
}
func (r *Registry) NodeInfo() interface{} {
@@ -460,11 +468,11 @@ func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
}
// HandleMsg is the message handler that delegates incoming messages
-func (p *Peer) HandleMsg(msg interface{}) error {
+func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case *SubscribeMsg:
- return p.handleSubscribeMsg(msg)
+ return p.handleSubscribeMsg(ctx, msg)
case *SubscribeErrorMsg:
return p.handleSubscribeErrorMsg(msg)
@@ -473,22 +481,22 @@ func (p *Peer) HandleMsg(msg interface{}) error {
return p.handleUnsubscribeMsg(msg)
case *OfferedHashesMsg:
- return p.handleOfferedHashesMsg(msg)
+ return p.handleOfferedHashesMsg(ctx, msg)
case *TakeoverProofMsg:
- return p.handleTakeoverProofMsg(msg)
+ return p.handleTakeoverProofMsg(ctx, msg)
case *WantedHashesMsg:
- return p.handleWantedHashesMsg(msg)
+ return p.handleWantedHashesMsg(ctx, msg)
case *ChunkDeliveryMsg:
- return p.streamer.delivery.handleChunkDeliveryMsg(p, msg)
+ return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg)
case *RetrieveRequestMsg:
- return p.streamer.delivery.handleRetrieveRequestMsg(p, msg)
+ return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
case *RequestSubscriptionMsg:
- return p.handleRequestSubscription(msg)
+ return p.handleRequestSubscription(ctx, msg)
case *QuitMsg:
return p.handleQuitMsg(msg)
@@ -508,7 +516,7 @@ type server struct {
// Server interface for outgoing peer Streamer
type Server interface {
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
- GetData([]byte) ([]byte, error)
+ GetData(context.Context, []byte) ([]byte, error)
Close()
}
@@ -551,7 +559,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData([]byte) func()
+ NeedData(context.Context, []byte) func()
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}
@@ -588,7 +596,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error
if err != nil {
return err
}
- if err := p.SendPriority(tp, c.priority); err != nil {
+ if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 44622c995..7523860c9 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -18,6 +18,7 @@ package stream
import (
"bytes"
+ "context"
"testing"
"time"
@@ -79,7 +80,7 @@ func newTestClient(t string) *testClient {
}
}
-func (self *testClient) NeedData(hash []byte) func() {
+func (self *testClient) NeedData(ctx context.Context, hash []byte) func() {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
return func() {
@@ -114,7 +115,7 @@ func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, ui
return make([]byte, HashSize), from + 1, to + 1, nil, nil
}
-func (self *testServer) GetData([]byte) ([]byte, error) {
+func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
return nil, nil
}
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index 5510b2409..d7febe4a3 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -17,6 +17,7 @@
package stream
import (
+ "context"
"math"
"strconv"
"time"
@@ -78,8 +79,8 @@ func (s *SwarmSyncerServer) Close() {
}
// GetSection retrieves the actual chunk from localstore
-func (s *SwarmSyncerServer) GetData(key []byte) ([]byte, error) {
- chunk, err := s.db.Get(storage.Address(key))
+func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
+ chunk, err := s.db.Get(ctx, storage.Address(key))
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
@@ -210,8 +211,8 @@ func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
}
// NeedData
-func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) {
- chunk, _ := s.db.GetOrCreateRequest(key)
+func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) {
+ chunk, _ := s.db.GetOrCreateRequest(ctx, key)
// TODO: we may want to request from this peer anyway even if the request exists
// ignoreExistingRequest is temporary commented out until its functionality is verified.
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index 5fea7befe..a3d53e648 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -231,7 +231,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
for j := i; j < nodes; j++ {
total += len(hashes[j])
for _, key := range hashes[j] {
- chunk, err := dbs[i].Get(key)
+ chunk, err := dbs[i].Get(ctx, key)
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {