diff options
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r-- | swarm/storage/chunker.go | 102 |
1 files changed, 49 insertions, 53 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 6d805b8e2..40292e88f 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -22,10 +22,9 @@ import ( "fmt" "io" "sync" - "time" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" @@ -67,7 +66,6 @@ The hashing itself does use extra copies and allocation though, since it does ne var ( errAppendOppNotSuported = errors.New("Append operation not supported") - errOperationTimedOut = errors.New("operation timed out") ) type ChunkerParams struct { @@ -133,7 +131,7 @@ type TreeChunker struct { func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader { jp := &JoinerParams{ ChunkerParams: ChunkerParams{ - chunkSize: chunk.DefaultSize, + chunkSize: ch.DefaultSize, hashSize: int64(len(addr)), }, addr: addr, @@ -153,7 +151,7 @@ func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) ( tsp := &TreeSplitterParams{ SplitterParams: SplitterParams{ ChunkerParams: ChunkerParams{ - chunkSize: chunk.DefaultSize, + chunkSize: ch.DefaultSize, hashSize: putter.RefSize(), }, reader: data, @@ -201,11 +199,6 @@ func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker { return tc } -// String() for pretty printing -func (c *Chunk) String() string { - return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", c.Addr.Log(), c.Size, len(c.SData)) -} - type hashJob struct { key Address chunk []byte @@ -236,7 +229,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. panic("chunker must be initialised") } - tc.runWorker() + tc.runWorker(ctx) depth := 0 treeSize := tc.chunkSize @@ -251,7 +244,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. // this waitgroup member is released after the root hash is calculated tc.wg.Add(1) //launch actual recursive function passing the waitgroups - go tc.split(depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) + go tc.split(ctx, depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -267,14 +260,14 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. if err != nil { return nil, nil, err } - case <-time.NewTimer(splitTimeout).C: - return nil, nil, errOperationTimedOut + case <-ctx.Done(): + return nil, nil, ctx.Err() } return key, tc.putter.Wait, nil } -func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { +func (tc *TreeChunker) split(ctx context.Context, depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { // @@ -321,10 +314,10 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 secSize = treeSize } // the hash of that data - subTreeKey := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] + subTreeAddress := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] childrenWg.Add(1) - tc.split(depth-1, treeSize/tc.branches, subTreeKey, secSize, childrenWg) + tc.split(ctx, depth-1, treeSize/tc.branches, subTreeAddress, secSize, childrenWg) i++ pos += treeSize @@ -336,7 +329,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 worker := tc.getWorkerCount() if int64(len(tc.jobC)) > worker && worker < ChunkProcessors { - tc.runWorker() + tc.runWorker(ctx) } select { @@ -345,7 +338,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 } } -func (tc *TreeChunker) runWorker() { +func (tc *TreeChunker) runWorker(ctx context.Context) { tc.incrementWorkerCount() go func() { defer tc.decrementWorkerCount() @@ -357,7 +350,7 @@ func (tc *TreeChunker) runWorker() { return } - h, err := tc.putter.Put(tc.ctx, job.chunk) + h, err := tc.putter.Put(ctx, job.chunk) if err != nil { tc.errC <- err return @@ -377,8 +370,8 @@ func (tc *TreeChunker) Append() (Address, func(), error) { // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { - Ctx context.Context - key Address // root key + ctx context.Context + addr Address // root address chunkData ChunkData off int64 // offset chunkSize int64 // inherit from chunker @@ -390,18 +383,18 @@ type LazyChunkReader struct { func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader { return &LazyChunkReader{ - key: tc.addr, + addr: tc.addr, chunkSize: tc.chunkSize, branches: tc.branches, hashSize: tc.hashSize, depth: tc.depth, getter: tc.getter, - Ctx: tc.ctx, + ctx: tc.ctx, } } func (r *LazyChunkReader) Context() context.Context { - return r.Ctx + return r.ctx } // Size is meant to be called on the LazySectionReader @@ -415,23 +408,24 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e "lcr.size") defer sp.Finish() - log.Debug("lazychunkreader.size", "key", r.key) + log.Debug("lazychunkreader.size", "addr", r.addr) if r.chunkData == nil { - chunkData, err := r.getter.Get(cctx, Reference(r.key)) + chunkData, err := r.getter.Get(cctx, Reference(r.addr)) if err != nil { return 0, err } - if chunkData == nil { - select { - case <-quitC: - return 0, errors.New("aborted") - default: - return 0, fmt.Errorf("root chunk not found for %v", r.key.Hex()) - } - } r.chunkData = chunkData + s := r.chunkData.Size() + log.Debug("lazychunkreader.size", "key", r.addr, "size", s) + if s < 0 { + return 0, errors.New("corrupt size") + } + return int64(s), nil } - return r.chunkData.Size(), nil + s := r.chunkData.Size() + log.Debug("lazychunkreader.size", "key", r.addr, "size", s) + + return int64(s), nil } // read at can be called numerous times @@ -443,7 +437,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { var sp opentracing.Span var cctx context.Context cctx, sp = spancontext.StartSpan( - r.Ctx, + r.ctx, "lcr.read") defer sp.Finish() @@ -460,7 +454,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { quitC := make(chan bool) size, err := r.Size(cctx, quitC) if err != nil { - log.Error("lazychunkreader.readat.size", "size", size, "err", err) + log.Debug("lazychunkreader.readat.size", "size", size, "err", err) return 0, err } @@ -481,7 +475,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -489,20 +483,22 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { err = <-errC if err != nil { - log.Error("lazychunkreader.readat.errc", "err", err) + log.Debug("lazychunkreader.readat.errc", "err", err) close(quitC) return 0, err } if off+int64(len(b)) >= size { + log.Debug("lazychunkreader.readat.return at end", "size", size, "off", off) return int(size - off), io.EOF } + log.Debug("lazychunkreader.readat.errc", "buff", len(b)) return len(b), nil } -func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level - for chunkData.Size() < treeSize && depth > r.depth { + for chunkData.Size() < uint64(treeSize) && depth > r.depth { treeSize /= r.branches depth-- } @@ -545,19 +541,19 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in } wg.Add(1) go func(j int64) { - childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] - chunkData, err := r.getter.Get(ctx, Reference(childKey)) + childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) if err != nil { - log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) + log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { - case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childKey)): + case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): case <-quitC: } return } if l := len(chunkData); l < 9 { select { - case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childKey), l): + case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l): case <-quitC: } return @@ -565,26 +561,26 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in if soff < off { soff = off } - r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } // Read keeps a cursor so cannot be called simulateously, see ReadAt func (r *LazyChunkReader) Read(b []byte) (read int, err error) { - log.Debug("lazychunkreader.read", "key", r.key) + log.Debug("lazychunkreader.read", "key", r.addr) metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1) read, err = r.ReadAt(b, r.off) if err != nil && err != io.EOF { - log.Error("lazychunkreader.readat", "read", read, "err", err) + log.Debug("lazychunkreader.readat", "read", read, "err", err) metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1) } metrics.GetOrRegisterCounter("lazychunkreader.read.bytes", nil).Inc(int64(read)) r.off += int64(read) - return + return read, err } // completely analogous to standard SectionReader implementation @@ -592,7 +588,7 @@ var errWhence = errors.New("Seek: invalid whence") var errOffset = errors.New("Seek: invalid offset") func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { - log.Debug("lazychunkreader.seek", "key", r.key, "offset", offset) + log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset) switch whence { default: return 0, errWhence @@ -607,7 +603,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { return 0, fmt.Errorf("can't get size: %v", err) } } - offset += r.chunkData.Size() + offset += int64(r.chunkData.Size()) } if offset < 0 { |