aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/pyramid.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/pyramid.go')
-rw-r--r--swarm/storage/pyramid.go77
1 files changed, 41 insertions, 36 deletions
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
index 36ff66d04..f74eef06b 100644
--- a/swarm/storage/pyramid.go
+++ b/swarm/storage/pyramid.go
@@ -25,7 +25,7 @@ import (
"sync"
"time"
- "github.com/ethereum/go-ethereum/swarm/chunk"
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
)
@@ -57,7 +57,7 @@ import (
When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree
entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal
is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one
- tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file.
+ tree entry is present in certain level. The key of tree entry is given out as the rootAddress of the file.
*/
@@ -98,15 +98,15 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get
}
/*
- When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
+ When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes.
New chunks to store are store using the putter which the caller provides.
*/
func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
- return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx)
+ return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, ch.DefaultSize)).Split(ctx)
}
func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
- return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx)
+ return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, ch.DefaultSize)).Append(ctx)
}
// Entry to create a tree node
@@ -153,7 +153,7 @@ type PyramidChunker struct {
wg *sync.WaitGroup
errC chan error
quitC chan bool
- rootKey []byte
+ rootAddress []byte
chunkLevel [][]*TreeEntry
}
@@ -171,14 +171,14 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) {
pc.wg = &sync.WaitGroup{}
pc.errC = make(chan error)
pc.quitC = make(chan bool)
- pc.rootKey = make([]byte, pc.hashSize)
+ pc.rootAddress = make([]byte, pc.hashSize)
pc.chunkLevel = make([][]*TreeEntry, pc.branches)
return
}
func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader {
return &LazyChunkReader{
- key: addr,
+ addr: addr,
depth: depth,
chunkSize: pc.chunkSize,
branches: pc.branches,
@@ -209,7 +209,7 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte
log.Debug("pyramid.chunker: Split()")
pc.wg.Add(1)
- pc.prepareChunks(false)
+ pc.prepareChunks(ctx, false)
// closes internal error channel if all subprocesses in the workgroup finished
go func() {
@@ -231,19 +231,21 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte
if err != nil {
return nil, nil, err
}
- case <-time.NewTimer(splitTimeout).C:
+ case <-ctx.Done():
+ _ = pc.putter.Wait(ctx) //???
+ return nil, nil, ctx.Err()
}
- return pc.rootKey, pc.putter.Wait, nil
+ return pc.rootAddress, pc.putter.Wait, nil
}
func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
log.Debug("pyramid.chunker: Append()")
// Load the right most unfinished tree chunks in every level
- pc.loadTree()
+ pc.loadTree(ctx)
pc.wg.Add(1)
- pc.prepareChunks(true)
+ pc.prepareChunks(ctx, true)
// closes internal error channel if all subprocesses in the workgroup finished
go func() {
@@ -265,11 +267,11 @@ func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(cont
case <-time.NewTimer(splitTimeout).C:
}
- return pc.rootKey, pc.putter.Wait, nil
+ return pc.rootAddress, pc.putter.Wait, nil
}
-func (pc *PyramidChunker) processor(id int64) {
+func (pc *PyramidChunker) processor(ctx context.Context, id int64) {
defer pc.decrementWorkerCount()
for {
select {
@@ -278,19 +280,22 @@ func (pc *PyramidChunker) processor(id int64) {
if !ok {
return
}
- pc.processChunk(id, job)
+ pc.processChunk(ctx, id, job)
case <-pc.quitC:
return
}
}
}
-func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) {
+func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) {
log.Debug("pyramid.chunker: processChunk()", "id", id)
- ref, err := pc.putter.Put(context.TODO(), job.chunk)
+ ref, err := pc.putter.Put(ctx, job.chunk)
if err != nil {
- pc.errC <- err
+ select {
+ case pc.errC <- err:
+ case <-pc.quitC:
+ }
}
// report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
@@ -300,14 +305,14 @@ func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) {
job.parentWg.Done()
}
-func (pc *PyramidChunker) loadTree() error {
+func (pc *PyramidChunker) loadTree(ctx context.Context) error {
log.Debug("pyramid.chunker: loadTree()")
// Get the root chunk to get the total size
- chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key))
+ chunkData, err := pc.getter.Get(ctx, Reference(pc.key))
if err != nil {
return errLoadingTreeRootChunk
}
- chunkSize := chunkData.Size()
+ chunkSize := int64(chunkData.Size())
log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize)
//if data size is less than a chunk... add a parent with update as pending
@@ -356,7 +361,7 @@ func (pc *PyramidChunker) loadTree() error {
branchCount = int64(len(ent.chunk)-8) / pc.hashSize
for i := int64(0); i < branchCount; i++ {
key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)]
- newChunkData, err := pc.getter.Get(context.TODO(), Reference(key))
+ newChunkData, err := pc.getter.Get(ctx, Reference(key))
if err != nil {
return errLoadingTreeChunk
}
@@ -365,7 +370,7 @@ func (pc *PyramidChunker) loadTree() error {
newEntry := &TreeEntry{
level: lvl - 1,
branchCount: bewBranchCount,
- subtreeSize: uint64(newChunkSize),
+ subtreeSize: newChunkSize,
chunk: newChunkData,
key: key,
index: 0,
@@ -385,7 +390,7 @@ func (pc *PyramidChunker) loadTree() error {
return nil
}
-func (pc *PyramidChunker) prepareChunks(isAppend bool) {
+func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) {
log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend)
defer pc.wg.Done()
@@ -393,11 +398,11 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) {
pc.incrementWorkerCount()
- go pc.processor(pc.workerCount)
+ go pc.processor(ctx, pc.workerCount)
parent := NewTreeEntry(pc)
var unfinishedChunkData ChunkData
- var unfinishedChunkSize int64
+ var unfinishedChunkSize uint64
if isAppend && len(pc.chunkLevel[0]) != 0 {
lastIndex := len(pc.chunkLevel[0]) - 1
@@ -415,16 +420,16 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) {
}
lastBranch := parent.branchCount - 1
- lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize]
+ lastAddress := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize]
var err error
- unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey)
+ unfinishedChunkData, err = pc.getter.Get(ctx, lastAddress)
if err != nil {
pc.errC <- err
}
unfinishedChunkSize = unfinishedChunkData.Size()
- if unfinishedChunkSize < pc.chunkSize {
- parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize)
+ if unfinishedChunkSize < uint64(pc.chunkSize) {
+ parent.subtreeSize = parent.subtreeSize - unfinishedChunkSize
parent.branchCount = parent.branchCount - 1
} else {
unfinishedChunkData = nil
@@ -468,8 +473,8 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) {
if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) {
// Data is exactly one chunk.. pick the last chunk key as root
chunkWG.Wait()
- lastChunksKey := parent.chunk[8 : 8+pc.hashSize]
- copy(pc.rootKey, lastChunksKey)
+ lastChunksAddress := parent.chunk[8 : 8+pc.hashSize]
+ copy(pc.rootAddress, lastChunksAddress)
break
}
} else {
@@ -502,7 +507,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) {
// No need to build the tree if the depth is 0
// or we are appending.
// Just use the last key.
- copy(pc.rootKey, pkey)
+ copy(pc.rootAddress, pkey)
} else {
// We need to build the tree and and provide the lonely
// chunk key to replace the last tree chunk key.
@@ -525,7 +530,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) {
workers := pc.getWorkerCount()
if int64(len(pc.jobC)) > workers && workers < ChunkProcessors {
pc.incrementWorkerCount()
- go pc.processor(pc.workerCount)
+ go pc.processor(ctx, pc.workerCount)
}
}
@@ -558,7 +563,7 @@ func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync
lvlCount := int64(len(pc.chunkLevel[lvl]))
if lvlCount == 1 && last {
- copy(pc.rootKey, pc.chunkLevel[lvl][0].key)
+ copy(pc.rootAddress, pc.chunkLevel[lvl][0].key)
return
}