diff options
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r-- | swarm/storage/chunker.go | 53 |
1 files changed, 44 insertions, 9 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 2d197fefa..b9b502273 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -26,6 +26,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" ) /* @@ -93,9 +96,12 @@ type JoinerParams struct { getter Getter // TODO: there is a bug, so depth can only be 0 today, see: https://github.com/ethersphere/go-ethereum/issues/344 depth int + ctx context.Context } type TreeChunker struct { + ctx context.Context + branches int64 hashFunc SwarmHasher dataSize int64 @@ -136,6 +142,7 @@ func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *Lazy addr: addr, getter: getter, depth: depth, + ctx: ctx, } return NewTreeJoiner(jp).Join(ctx) @@ -174,6 +181,8 @@ func NewTreeJoiner(params *JoinerParams) *TreeChunker { tc.errC = make(chan error) tc.quitC = make(chan bool) + tc.ctx = params.ctx + return tc } @@ -351,7 +360,7 @@ func (tc *TreeChunker) runWorker() { return } - h, err := tc.putter.Put(job.chunk) + h, err := tc.putter.Put(tc.ctx, job.chunk) if err != nil { tc.errC <- err return @@ -371,6 +380,7 @@ func (tc *TreeChunker) Append() (Address, func(), error) { // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { + Ctx context.Context key Address // root key chunkData ChunkData off int64 // offset @@ -389,16 +399,28 @@ func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader { hashSize: tc.hashSize, depth: tc.depth, getter: tc.getter, + Ctx: tc.ctx, } } +func (r *LazyChunkReader) Context() context.Context { + return r.Ctx +} + // Size is meant to be called on the LazySectionReader -func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { +func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, err error) { metrics.GetOrRegisterCounter("lazychunkreader.size", nil).Inc(1) + var sp opentracing.Span + var cctx context.Context + cctx, sp = spancontext.StartSpan( + ctx, + "lcr.size") + defer sp.Finish() + log.Debug("lazychunkreader.size", "key", r.key) if r.chunkData == nil { - chunkData, err := r.getter.Get(Reference(r.key)) + chunkData, err := r.getter.Get(cctx, Reference(r.key)) if err != nil { return 0, err } @@ -421,12 +443,25 @@ func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { metrics.GetOrRegisterCounter("lazychunkreader.readat", nil).Inc(1) + var sp opentracing.Span + var cctx context.Context + cctx, sp = spancontext.StartSpan( + r.Ctx, + "lcr.read") + defer sp.Finish() + + defer func() { + sp.LogFields( + olog.Int("off", int(off)), + olog.Int("read", read)) + }() + // this is correct, a swarm doc cannot be zero length, so no EOF is expected if len(b) == 0 { return 0, nil } quitC := make(chan bool) - size, err := r.Size(quitC) + size, err := r.Size(cctx, quitC) if err != nil { log.Error("lazychunkreader.readat.size", "size", size, "err", err) return 0, err @@ -449,7 +484,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -467,7 +502,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return len(b), nil } -func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level for chunkData.Size() < treeSize && depth > r.depth { @@ -514,7 +549,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS wg.Add(1) go func(j int64) { childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] - chunkData, err := r.getter.Get(Reference(childKey)) + chunkData, err := r.getter.Get(ctx, Reference(childKey)) if err != nil { log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) select { @@ -533,7 +568,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS if soff < off { soff = off } - r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } @@ -570,7 +605,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { offset += r.off case 2: if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first - _, err := r.Size(nil) + _, err := r.Size(context.TODO(), nil) if err != nil { return 0, fmt.Errorf("can't get size: %v", err) } |