diff options
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r-- | swarm/storage/pyramid.go | 77 |
1 files changed, 41 insertions, 36 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 36ff66d04..f74eef06b 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" ) @@ -57,7 +57,7 @@ import ( When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one - tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file. + tree entry is present in certain level. The key of tree entry is given out as the rootAddress of the file. */ @@ -98,15 +98,15 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get } /* - When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes. New chunks to store are store using the putter which the caller provides. */ func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, ch.DefaultSize)).Split(ctx) } func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, ch.DefaultSize)).Append(ctx) } // Entry to create a tree node @@ -153,7 +153,7 @@ type PyramidChunker struct { wg *sync.WaitGroup errC chan error quitC chan bool - rootKey []byte + rootAddress []byte chunkLevel [][]*TreeEntry } @@ -171,14 +171,14 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { pc.wg = &sync.WaitGroup{} pc.errC = make(chan error) pc.quitC = make(chan bool) - pc.rootKey = make([]byte, pc.hashSize) + pc.rootAddress = make([]byte, pc.hashSize) pc.chunkLevel = make([][]*TreeEntry, pc.branches) return } func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader { return &LazyChunkReader{ - key: addr, + addr: addr, depth: depth, chunkSize: pc.chunkSize, branches: pc.branches, @@ -209,7 +209,7 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte log.Debug("pyramid.chunker: Split()") pc.wg.Add(1) - pc.prepareChunks(false) + pc.prepareChunks(ctx, false) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -231,19 +231,21 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte if err != nil { return nil, nil, err } - case <-time.NewTimer(splitTimeout).C: + case <-ctx.Done(): + _ = pc.putter.Wait(ctx) //??? + return nil, nil, ctx.Err() } - return pc.rootKey, pc.putter.Wait, nil + return pc.rootAddress, pc.putter.Wait, nil } func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) { log.Debug("pyramid.chunker: Append()") // Load the right most unfinished tree chunks in every level - pc.loadTree() + pc.loadTree(ctx) pc.wg.Add(1) - pc.prepareChunks(true) + pc.prepareChunks(ctx, true) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -265,11 +267,11 @@ func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(cont case <-time.NewTimer(splitTimeout).C: } - return pc.rootKey, pc.putter.Wait, nil + return pc.rootAddress, pc.putter.Wait, nil } -func (pc *PyramidChunker) processor(id int64) { +func (pc *PyramidChunker) processor(ctx context.Context, id int64) { defer pc.decrementWorkerCount() for { select { @@ -278,19 +280,22 @@ func (pc *PyramidChunker) processor(id int64) { if !ok { return } - pc.processChunk(id, job) + pc.processChunk(ctx, id, job) case <-pc.quitC: return } } } -func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { +func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) { log.Debug("pyramid.chunker: processChunk()", "id", id) - ref, err := pc.putter.Put(context.TODO(), job.chunk) + ref, err := pc.putter.Put(ctx, job.chunk) if err != nil { - pc.errC <- err + select { + case pc.errC <- err: + case <-pc.quitC: + } } // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) @@ -300,14 +305,14 @@ func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { job.parentWg.Done() } -func (pc *PyramidChunker) loadTree() error { +func (pc *PyramidChunker) loadTree(ctx context.Context) error { log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key)) + chunkData, err := pc.getter.Get(ctx, Reference(pc.key)) if err != nil { return errLoadingTreeRootChunk } - chunkSize := chunkData.Size() + chunkSize := int64(chunkData.Size()) log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize) //if data size is less than a chunk... add a parent with update as pending @@ -356,7 +361,7 @@ func (pc *PyramidChunker) loadTree() error { branchCount = int64(len(ent.chunk)-8) / pc.hashSize for i := int64(0); i < branchCount; i++ { key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] - newChunkData, err := pc.getter.Get(context.TODO(), Reference(key)) + newChunkData, err := pc.getter.Get(ctx, Reference(key)) if err != nil { return errLoadingTreeChunk } @@ -365,7 +370,7 @@ func (pc *PyramidChunker) loadTree() error { newEntry := &TreeEntry{ level: lvl - 1, branchCount: bewBranchCount, - subtreeSize: uint64(newChunkSize), + subtreeSize: newChunkSize, chunk: newChunkData, key: key, index: 0, @@ -385,7 +390,7 @@ func (pc *PyramidChunker) loadTree() error { return nil } -func (pc *PyramidChunker) prepareChunks(isAppend bool) { +func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) { log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend) defer pc.wg.Done() @@ -393,11 +398,11 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { pc.incrementWorkerCount() - go pc.processor(pc.workerCount) + go pc.processor(ctx, pc.workerCount) parent := NewTreeEntry(pc) var unfinishedChunkData ChunkData - var unfinishedChunkSize int64 + var unfinishedChunkSize uint64 if isAppend && len(pc.chunkLevel[0]) != 0 { lastIndex := len(pc.chunkLevel[0]) - 1 @@ -415,16 +420,16 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { } lastBranch := parent.branchCount - 1 - lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] + lastAddress := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] var err error - unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey) + unfinishedChunkData, err = pc.getter.Get(ctx, lastAddress) if err != nil { pc.errC <- err } unfinishedChunkSize = unfinishedChunkData.Size() - if unfinishedChunkSize < pc.chunkSize { - parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize) + if unfinishedChunkSize < uint64(pc.chunkSize) { + parent.subtreeSize = parent.subtreeSize - unfinishedChunkSize parent.branchCount = parent.branchCount - 1 } else { unfinishedChunkData = nil @@ -468,8 +473,8 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) { // Data is exactly one chunk.. pick the last chunk key as root chunkWG.Wait() - lastChunksKey := parent.chunk[8 : 8+pc.hashSize] - copy(pc.rootKey, lastChunksKey) + lastChunksAddress := parent.chunk[8 : 8+pc.hashSize] + copy(pc.rootAddress, lastChunksAddress) break } } else { @@ -502,7 +507,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { // No need to build the tree if the depth is 0 // or we are appending. // Just use the last key. - copy(pc.rootKey, pkey) + copy(pc.rootAddress, pkey) } else { // We need to build the tree and and provide the lonely // chunk key to replace the last tree chunk key. @@ -525,7 +530,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { workers := pc.getWorkerCount() if int64(len(pc.jobC)) > workers && workers < ChunkProcessors { pc.incrementWorkerCount() - go pc.processor(pc.workerCount) + go pc.processor(ctx, pc.workerCount) } } @@ -558,7 +563,7 @@ func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync lvlCount := int64(len(pc.chunkLevel[lvl])) if lvlCount == 1 && last { - copy(pc.rootKey, pc.chunkLevel[lvl][0].key) + copy(pc.rootAddress, pc.chunkLevel[lvl][0].key) return } |