aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/chunker.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/chunker.go')
-rw-r--r--swarm/storage/chunker.go102
1 files changed, 49 insertions, 53 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index 6d805b8e2..40292e88f 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -22,10 +22,9 @@ import (
"fmt"
"io"
"sync"
- "time"
"github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/swarm/chunk"
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"
opentracing "github.com/opentracing/opentracing-go"
@@ -67,7 +66,6 @@ The hashing itself does use extra copies and allocation though, since it does ne
var (
errAppendOppNotSuported = errors.New("Append operation not supported")
- errOperationTimedOut = errors.New("operation timed out")
)
type ChunkerParams struct {
@@ -133,7 +131,7 @@ type TreeChunker struct {
func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader {
jp := &JoinerParams{
ChunkerParams: ChunkerParams{
- chunkSize: chunk.DefaultSize,
+ chunkSize: ch.DefaultSize,
hashSize: int64(len(addr)),
},
addr: addr,
@@ -153,7 +151,7 @@ func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) (
tsp := &TreeSplitterParams{
SplitterParams: SplitterParams{
ChunkerParams: ChunkerParams{
- chunkSize: chunk.DefaultSize,
+ chunkSize: ch.DefaultSize,
hashSize: putter.RefSize(),
},
reader: data,
@@ -201,11 +199,6 @@ func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker {
return tc
}
-// String() for pretty printing
-func (c *Chunk) String() string {
- return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", c.Addr.Log(), c.Size, len(c.SData))
-}
-
type hashJob struct {
key Address
chunk []byte
@@ -236,7 +229,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context.
panic("chunker must be initialised")
}
- tc.runWorker()
+ tc.runWorker(ctx)
depth := 0
treeSize := tc.chunkSize
@@ -251,7 +244,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context.
// this waitgroup member is released after the root hash is calculated
tc.wg.Add(1)
//launch actual recursive function passing the waitgroups
- go tc.split(depth, treeSize/tc.branches, key, tc.dataSize, tc.wg)
+ go tc.split(ctx, depth, treeSize/tc.branches, key, tc.dataSize, tc.wg)
// closes internal error channel if all subprocesses in the workgroup finished
go func() {
@@ -267,14 +260,14 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context.
if err != nil {
return nil, nil, err
}
- case <-time.NewTimer(splitTimeout).C:
- return nil, nil, errOperationTimedOut
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
}
return key, tc.putter.Wait, nil
}
-func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) {
+func (tc *TreeChunker) split(ctx context.Context, depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) {
//
@@ -321,10 +314,10 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64
secSize = treeSize
}
// the hash of that data
- subTreeKey := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize]
+ subTreeAddress := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize]
childrenWg.Add(1)
- tc.split(depth-1, treeSize/tc.branches, subTreeKey, secSize, childrenWg)
+ tc.split(ctx, depth-1, treeSize/tc.branches, subTreeAddress, secSize, childrenWg)
i++
pos += treeSize
@@ -336,7 +329,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64
worker := tc.getWorkerCount()
if int64(len(tc.jobC)) > worker && worker < ChunkProcessors {
- tc.runWorker()
+ tc.runWorker(ctx)
}
select {
@@ -345,7 +338,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64
}
}
-func (tc *TreeChunker) runWorker() {
+func (tc *TreeChunker) runWorker(ctx context.Context) {
tc.incrementWorkerCount()
go func() {
defer tc.decrementWorkerCount()
@@ -357,7 +350,7 @@ func (tc *TreeChunker) runWorker() {
return
}
- h, err := tc.putter.Put(tc.ctx, job.chunk)
+ h, err := tc.putter.Put(ctx, job.chunk)
if err != nil {
tc.errC <- err
return
@@ -377,8 +370,8 @@ func (tc *TreeChunker) Append() (Address, func(), error) {
// LazyChunkReader implements LazySectionReader
type LazyChunkReader struct {
- Ctx context.Context
- key Address // root key
+ ctx context.Context
+ addr Address // root address
chunkData ChunkData
off int64 // offset
chunkSize int64 // inherit from chunker
@@ -390,18 +383,18 @@ type LazyChunkReader struct {
func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader {
return &LazyChunkReader{
- key: tc.addr,
+ addr: tc.addr,
chunkSize: tc.chunkSize,
branches: tc.branches,
hashSize: tc.hashSize,
depth: tc.depth,
getter: tc.getter,
- Ctx: tc.ctx,
+ ctx: tc.ctx,
}
}
func (r *LazyChunkReader) Context() context.Context {
- return r.Ctx
+ return r.ctx
}
// Size is meant to be called on the LazySectionReader
@@ -415,23 +408,24 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e
"lcr.size")
defer sp.Finish()
- log.Debug("lazychunkreader.size", "key", r.key)
+ log.Debug("lazychunkreader.size", "addr", r.addr)
if r.chunkData == nil {
- chunkData, err := r.getter.Get(cctx, Reference(r.key))
+ chunkData, err := r.getter.Get(cctx, Reference(r.addr))
if err != nil {
return 0, err
}
- if chunkData == nil {
- select {
- case <-quitC:
- return 0, errors.New("aborted")
- default:
- return 0, fmt.Errorf("root chunk not found for %v", r.key.Hex())
- }
- }
r.chunkData = chunkData
+ s := r.chunkData.Size()
+ log.Debug("lazychunkreader.size", "key", r.addr, "size", s)
+ if s < 0 {
+ return 0, errors.New("corrupt size")
+ }
+ return int64(s), nil
}
- return r.chunkData.Size(), nil
+ s := r.chunkData.Size()
+ log.Debug("lazychunkreader.size", "key", r.addr, "size", s)
+
+ return int64(s), nil
}
// read at can be called numerous times
@@ -443,7 +437,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
var sp opentracing.Span
var cctx context.Context
cctx, sp = spancontext.StartSpan(
- r.Ctx,
+ r.ctx,
"lcr.read")
defer sp.Finish()
@@ -460,7 +454,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
quitC := make(chan bool)
size, err := r.Size(cctx, quitC)
if err != nil {
- log.Error("lazychunkreader.readat.size", "size", size, "err", err)
+ log.Debug("lazychunkreader.readat.size", "size", size, "err", err)
return 0, err
}
@@ -481,7 +475,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
length *= r.chunkSize
}
wg.Add(1)
- go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
+ go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC)
go func() {
wg.Wait()
close(errC)
@@ -489,20 +483,22 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
err = <-errC
if err != nil {
- log.Error("lazychunkreader.readat.errc", "err", err)
+ log.Debug("lazychunkreader.readat.errc", "err", err)
close(quitC)
return 0, err
}
if off+int64(len(b)) >= size {
+ log.Debug("lazychunkreader.readat.return at end", "size", size, "off", off)
return int(size - off), io.EOF
}
+ log.Debug("lazychunkreader.readat.errc", "buff", len(b))
return len(b), nil
}
-func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
+func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
defer parentWg.Done()
// find appropriate block level
- for chunkData.Size() < treeSize && depth > r.depth {
+ for chunkData.Size() < uint64(treeSize) && depth > r.depth {
treeSize /= r.branches
depth--
}
@@ -545,19 +541,19 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
}
wg.Add(1)
go func(j int64) {
- childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
- chunkData, err := r.getter.Get(ctx, Reference(childKey))
+ childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
+ chunkData, err := r.getter.Get(r.ctx, Reference(childAddress))
if err != nil {
- log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err)
+ log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
select {
- case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childKey)):
+ case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
case <-quitC:
}
return
}
if l := len(chunkData); l < 9 {
select {
- case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childKey), l):
+ case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l):
case <-quitC:
}
return
@@ -565,26 +561,26 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
if soff < off {
soff = off
}
- r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
+ r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC)
}(i)
} //for
}
// Read keeps a cursor so cannot be called simulateously, see ReadAt
func (r *LazyChunkReader) Read(b []byte) (read int, err error) {
- log.Debug("lazychunkreader.read", "key", r.key)
+ log.Debug("lazychunkreader.read", "key", r.addr)
metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1)
read, err = r.ReadAt(b, r.off)
if err != nil && err != io.EOF {
- log.Error("lazychunkreader.readat", "read", read, "err", err)
+ log.Debug("lazychunkreader.readat", "read", read, "err", err)
metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1)
}
metrics.GetOrRegisterCounter("lazychunkreader.read.bytes", nil).Inc(int64(read))
r.off += int64(read)
- return
+ return read, err
}
// completely analogous to standard SectionReader implementation
@@ -592,7 +588,7 @@ var errWhence = errors.New("Seek: invalid whence")
var errOffset = errors.New("Seek: invalid offset")
func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
- log.Debug("lazychunkreader.seek", "key", r.key, "offset", offset)
+ log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset)
switch whence {
default:
return 0, errWhence
@@ -607,7 +603,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
return 0, fmt.Errorf("can't get size: %v", err)
}
}
- offset += r.chunkData.Size()
+ offset += int64(r.chunkData.Size())
}
if offset < 0 {