aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage
diff options
context:
space:
mode:
authorΞTHΞЯSPHΞЯΞ <{viktor.tron,nagydani,zsfelfoldi}@gmail.com>2016-08-30 03:18:00 +0800
committerFelix Lange <fjl@twurst.com>2016-08-31 22:19:40 +0800
commit4d300e4dece56535f56ccc32330340ce89e42581 (patch)
tree135838bfae03437eb2a50c6d66de4d66b20c3220 /swarm/storage
parent1f58b2d084b65eaec9aa2c2ecb1d3aae50d894b3 (diff)
downloaddexon-4d300e4dece56535f56ccc32330340ce89e42581.tar.gz
dexon-4d300e4dece56535f56ccc32330340ce89e42581.tar.zst
dexon-4d300e4dece56535f56ccc32330340ce89e42581.zip
swarm: plan bee for content storage and distribution on web3
This change imports the Swarm protocol codebase. Compared to the 'swarm' branch, a few mostly cosmetic changes had to be made: * The various redundant log message prefixes are gone. * All files now have LGPLv3 license headers. * Minor code changes were needed to please go vet and make the tests pass on Windows. * Further changes were required to adapt to the go-ethereum develop branch and its new Go APIs. Some code has not (yet) been brought over: * swarm/cmd/bzzhash: will reappear as cmd/bzzhash later * swarm/cmd/bzzup.sh: will be reimplemented in cmd/bzzup * swarm/cmd/makegenesis: will reappear somehow * swarm/examples/album: will move to a separate repository * swarm/examples/filemanager: ditto * swarm/examples/files: will not be merged * swarm/test/*: will not be merged * swarm/services/swear: will reappear as contracts/swear when needed
Diffstat (limited to 'swarm/storage')
-rw-r--r--swarm/storage/chunker.go509
-rw-r--r--swarm/storage/chunker_test.go303
-rw-r--r--swarm/storage/common_test.go117
-rw-r--r--swarm/storage/database.go99
-rw-r--r--swarm/storage/dbstore.go473
-rw-r--r--swarm/storage/dbstore_test.go191
-rw-r--r--swarm/storage/dpa.go239
-rw-r--r--swarm/storage/dpa_test.go144
-rw-r--r--swarm/storage/localstore.go74
-rw-r--r--swarm/storage/memstore.go316
-rw-r--r--swarm/storage/memstore_test.go50
-rw-r--r--swarm/storage/netstore.go134
-rw-r--r--swarm/storage/pyramid.go211
-rw-r--r--swarm/storage/types.go232
14 files changed, 3092 insertions, 0 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
new file mode 100644
index 000000000..4c8551da9
--- /dev/null
+++ b/swarm/storage/chunker.go
@@ -0,0 +1,509 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "hash"
+ "io"
+ "sync"
+ // "github.com/ethereum/go-ethereum/logger"
+ // "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+/*
+The distributed storage implemented in this package requires fix sized chunks of content.
+
+Chunker is the interface to a component that is responsible for disassembling and assembling larger data.
+
+TreeChunker implements a Chunker based on a tree structure defined as follows:
+
+1 each node in the tree including the root and other branching nodes are stored as a chunk.
+
+2 branching nodes encode data contents that includes the size of the dataslice covered by its entire subtree under the node as well as the hash keys of all its children :
+data_{i} := size(subtree_{i}) || key_{j} || key_{j+1} .... || key_{j+n-1}
+
+3 Leaf nodes encode an actual subslice of the input data.
+
+4 if data size is not more than maximum chunksize, the data is stored in a single chunk
+ key = hash(int64(size) + data)
+
+5 if data size is more than chunksize*branches^l, but no more than chunksize*
+ branches^(l+1), the data vector is split into slices of chunksize*
+ branches^l length (except the last one).
+ key = hash(int64(size) + key(slice0) + key(slice1) + ...)
+
+ The underlying hash function is configurable
+*/
+
+const (
+ defaultHash = "SHA3" // http://golang.org/pkg/hash/#Hash
+ // defaultHash = "SHA256" // http://golang.org/pkg/hash/#Hash
+ defaultBranches int64 = 128
+ // hashSize int64 = hasherfunc.New().Size() // hasher knows about its own length in bytes
+ // chunksize int64 = branches * hashSize // chunk is defined as this
+)
+
+/*
+Tree chunker is a concrete implementation of data chunking.
+This chunker works in a simple way, it builds a tree out of the document so that each node either represents a chunk of real data or a chunk of data representing an branching non-leaf node of the tree. In particular each such non-leaf chunk will represent is a concatenation of the hash of its respective children. This scheme simultaneously guarantees data integrity as well as self addressing. Abstract nodes are transparent since their represented size component is strictly greater than their maximum data size, since they encode a subtree.
+
+If all is well it is possible to implement this by simply composing readers so that no extra allocation or buffering is necessary for the data splitting and joining. This means that in principle there can be direct IO between : memory, file system, network socket (bzz peers storage request is read from the socket). In practice there may be need for several stages of internal buffering.
+The hashing itself does use extra copies and allocation though, since it does need it.
+*/
+
+type ChunkerParams struct {
+ Branches int64
+ Hash string
+}
+
+func NewChunkerParams() *ChunkerParams {
+ return &ChunkerParams{
+ Branches: defaultBranches,
+ Hash: defaultHash,
+ }
+}
+
+type TreeChunker struct {
+ branches int64
+ hashFunc Hasher
+ // calculated
+ hashSize int64 // self.hashFunc.New().Size()
+ chunkSize int64 // hashSize* branches
+ workerCount int
+}
+
+func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
+ self = &TreeChunker{}
+ self.hashFunc = MakeHashFunc(params.Hash)
+ self.branches = params.Branches
+ self.hashSize = int64(self.hashFunc().Size())
+ self.chunkSize = self.hashSize * self.branches
+ self.workerCount = 1
+ return
+}
+
+// func (self *TreeChunker) KeySize() int64 {
+// return self.hashSize
+// }
+
+// String() for pretty printing
+func (self *Chunk) String() string {
+ return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", self.Key.Log(), self.Size, len(self.SData))
+}
+
+type hashJob struct {
+ key Key
+ chunk []byte
+ size int64
+ parentWg *sync.WaitGroup
+}
+
+func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+
+ if self.chunkSize <= 0 {
+ panic("chunker must be initialised")
+ }
+
+ jobC := make(chan *hashJob, 2*processors)
+ wg := &sync.WaitGroup{}
+ errC := make(chan error)
+
+ // wwg = workers waitgroup keeps track of hashworkers spawned by this split call
+ if wwg != nil {
+ wwg.Add(1)
+ }
+ go self.hashWorker(jobC, chunkC, errC, swg, wwg)
+
+ depth := 0
+ treeSize := self.chunkSize
+
+ // takes lowest depth such that chunksize*HashCount^(depth+1) > size
+ // power series, will find the order of magnitude of the data size in base hashCount or numbers of levels of branching in the resulting tree.
+ for ; treeSize < size; treeSize *= self.branches {
+ depth++
+ }
+
+ key := make([]byte, self.hashFunc().Size())
+ // glog.V(logger.Detail).Infof("split request received for data (%v bytes, depth: %v)", size, depth)
+ // this waitgroup member is released after the root hash is calculated
+ wg.Add(1)
+ //launch actual recursive function passing the waitgroups
+ go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, wg, swg, wwg)
+
+ // closes internal error channel if all subprocesses in the workgroup finished
+ go func() {
+ // waiting for all threads to finish
+ wg.Wait()
+ // if storage waitgroup is non-nil, we wait for storage to finish too
+ if swg != nil {
+ // glog.V(logger.Detail).Infof("Waiting for storage to finish")
+ swg.Wait()
+ }
+ close(errC)
+ }()
+
+ select {
+ case err := <-errC:
+ if err != nil {
+ return nil, err
+ }
+ //
+ }
+ return key, nil
+}
+
+func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, parentWg, swg, wwg *sync.WaitGroup) {
+
+ for depth > 0 && size < treeSize {
+ treeSize /= self.branches
+ depth--
+ }
+
+ if depth == 0 {
+ // leaf nodes -> content chunks
+ chunkData := make([]byte, size+8)
+ binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size))
+ data.Read(chunkData[8:])
+ select {
+ case jobC <- &hashJob{key, chunkData, size, parentWg}:
+ case <-errC:
+ }
+ // glog.V(logger.Detail).Infof("read %v", size)
+ return
+ }
+ // intermediate chunk containing child nodes hashes
+ branchCnt := int64((size + treeSize - 1) / treeSize)
+ // glog.V(logger.Detail).Infof("intermediate node: setting branches: %v, depth: %v, max subtree size: %v, data size: %v", branches, depth, treeSize, size)
+
+ var chunk []byte = make([]byte, branchCnt*self.hashSize+8)
+ var pos, i int64
+
+ binary.LittleEndian.PutUint64(chunk[0:8], uint64(size))
+
+ childrenWg := &sync.WaitGroup{}
+ var secSize int64
+ for i < branchCnt {
+ // the last item can have shorter data
+ if size-pos < treeSize {
+ secSize = size - pos
+ } else {
+ secSize = treeSize
+ }
+ // the hash of that data
+ subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize]
+
+ childrenWg.Add(1)
+ self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, childrenWg, swg, wwg)
+
+ i++
+ pos += treeSize
+ }
+ // wait for all the children to complete calculating their hashes and copying them onto sections of the chunk
+ // parentWg.Add(1)
+ // go func() {
+ childrenWg.Wait()
+ if len(jobC) > self.workerCount && self.workerCount < processors {
+ if wwg != nil {
+ wwg.Add(1)
+ }
+ self.workerCount++
+ go self.hashWorker(jobC, chunkC, errC, swg, wwg)
+ }
+ select {
+ case jobC <- &hashJob{key, chunk, size, parentWg}:
+ case <-errC:
+ }
+}
+
+func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, swg, wwg *sync.WaitGroup) {
+ hasher := self.hashFunc()
+ if wwg != nil {
+ defer wwg.Done()
+ }
+ for {
+ select {
+
+ case job, ok := <-jobC:
+ if !ok {
+ return
+ }
+ // now we got the hashes in the chunk, then hash the chunks
+ hasher.Reset()
+ self.hashChunk(hasher, job, chunkC, swg)
+ // glog.V(logger.Detail).Infof("hash chunk (%v)", job.size)
+ case <-errC:
+ return
+ }
+ }
+}
+
+// The treeChunkers own Hash hashes together
+// - the size (of the subtree encoded in the Chunk)
+// - the Chunk, ie. the contents read from the input reader
+func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
+ hasher.Write(job.chunk)
+ h := hasher.Sum(nil)
+ newChunk := &Chunk{
+ Key: h,
+ SData: job.chunk,
+ Size: job.size,
+ wg: swg,
+ }
+
+ // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
+ copy(job.key, h)
+ // send off new chunk to storage
+ if chunkC != nil {
+ if swg != nil {
+ swg.Add(1)
+ }
+ }
+ job.parentWg.Done()
+ if chunkC != nil {
+ chunkC <- newChunk
+ }
+}
+
+// LazyChunkReader implements LazySectionReader
+type LazyChunkReader struct {
+ key Key // root key
+ chunkC chan *Chunk // chunk channel to send retrieve requests on
+ chunk *Chunk // size of the entire subtree
+ off int64 // offset
+ chunkSize int64 // inherit from chunker
+ branches int64 // inherit from chunker
+ hashSize int64 // inherit from chunker
+}
+
+// implements the Joiner interface
+func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
+
+ return &LazyChunkReader{
+ key: key,
+ chunkC: chunkC,
+ chunkSize: self.chunkSize,
+ branches: self.branches,
+ hashSize: self.hashSize,
+ }
+}
+
+// Size is meant to be called on the LazySectionReader
+func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) {
+ if self.chunk != nil {
+ return self.chunk.Size, nil
+ }
+ chunk := retrieve(self.key, self.chunkC, quitC)
+ if chunk == nil {
+ select {
+ case <-quitC:
+ return 0, errors.New("aborted")
+ default:
+ return 0, fmt.Errorf("root chunk not found for %v", self.key.Hex())
+ }
+ }
+ self.chunk = chunk
+ return chunk.Size, nil
+}
+
+// read at can be called numerous times
+// concurrent reads are allowed
+// Size() needs to be called synchronously on the LazyChunkReader first
+func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
+ // this is correct, a swarm doc cannot be zero length, so no EOF is expected
+ if len(b) == 0 {
+ // glog.V(logger.Detail).Infof("Size query for %v", chunk.Key.Log())
+ return 0, nil
+ }
+ quitC := make(chan bool)
+ size, err := self.Size(quitC)
+ if err != nil {
+ return 0, err
+ }
+ // glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size)
+
+ errC := make(chan error)
+ // glog.V(logger.Detail).Infof("readAt: reading %v into %d bytes at offset %d.", self.chunk.Key.Log(), len(b), off)
+
+ // }
+ // glog.V(logger.Detail).Infof("-> want: %v, off: %v size: %v ", want, off, self.size)
+ var treeSize int64
+ var depth int
+ // calculate depth and max treeSize
+ treeSize = self.chunkSize
+ for ; treeSize < size; treeSize *= self.branches {
+ depth++
+ }
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go self.join(b, off, off+int64(len(b)), depth, treeSize/self.branches, self.chunk, &wg, errC, quitC)
+ go func() {
+ wg.Wait()
+ close(errC)
+ }()
+
+ err = <-errC
+ if err != nil {
+ close(quitC)
+
+ return 0, err
+ }
+ // glog.V(logger.Detail).Infof("ReadAt received %v", err)
+ // glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size)
+ if off+int64(len(b)) >= size {
+ // glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b))
+ return len(b), io.EOF
+ }
+ // glog.V(logger.Detail).Infof("ReadAt returning at %d: %v", read, err)
+ return len(b), nil
+}
+
+func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
+ defer parentWg.Done()
+ // return NewDPA(&LocalStore{})
+ // glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff)
+
+ // glog.V(logger.Detail).Infof("depth: %v, loff: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, off, eoff, chunk.Size, treeSize)
+
+ // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+
+ // find appropriate block level
+ for chunk.Size < treeSize && depth > 0 {
+ treeSize /= self.branches
+ depth--
+ }
+
+ // leaf chunk found
+ if depth == 0 {
+ // glog.V(logger.Detail).Infof("depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, len(chunk.SData), treeSize)
+ extra := 8 + eoff - int64(len(chunk.SData))
+ if extra > 0 {
+ eoff -= extra
+ }
+ copy(b, chunk.SData[8+off:8+eoff])
+ return // simply give back the chunks reader for content chunks
+ }
+
+ // subtree
+ start := off / treeSize
+ end := (eoff + treeSize - 1) / treeSize
+
+ wg := &sync.WaitGroup{}
+ defer wg.Wait()
+ // glog.V(logger.Detail).Infof("start %v,end %v", start, end)
+
+ for i := start; i < end; i++ {
+ soff := i * treeSize
+ roff := soff
+ seoff := soff + treeSize
+
+ if soff < off {
+ soff = off
+ }
+ if seoff > eoff {
+ seoff = eoff
+ }
+ if depth > 1 {
+ wg.Wait()
+ }
+ wg.Add(1)
+ go func(j int64) {
+ childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize]
+ // glog.V(logger.Detail).Infof("subtree ind.ex: %v -> %v", j, childKey.Log())
+ chunk := retrieve(childKey, self.chunkC, quitC)
+ if chunk == nil {
+ select {
+ case errC <- fmt.Errorf("chunk %v-%v not found", off, off+treeSize):
+ case <-quitC:
+ }
+ return
+ }
+ if soff < off {
+ soff = off
+ }
+ self.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/self.branches, chunk, wg, errC, quitC)
+ }(i)
+ } //for
+}
+
+// the helper method submits chunks for a key to a oueue (DPA) and
+// block until they time out or arrive
+// abort if quitC is readable
+func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
+ chunk := &Chunk{
+ Key: key,
+ C: make(chan bool), // close channel to signal data delivery
+ }
+ // glog.V(logger.Detail).Infof("chunk data sent for %v (key interval in chunk %v-%v)", ch.Key.Log(), j*self.chunker.hashSize, (j+1)*self.chunker.hashSize)
+ // submit chunk for retrieval
+ select {
+ case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally)
+ case <-quitC:
+ return nil
+ }
+ // waiting for the chunk retrieval
+ select { // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+
+ case <-quitC:
+ // this is how we control process leakage (quitC is closed once join is finished (after timeout))
+ return nil
+ case <-chunk.C: // bells are ringing, data have been delivered
+ // glog.V(logger.Detail).Infof("chunk data received")
+ }
+ if len(chunk.SData) == 0 {
+ return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+
+ }
+ return chunk
+}
+
+// Read keeps a cursor so cannot be called simulateously, see ReadAt
+func (self *LazyChunkReader) Read(b []byte) (read int, err error) {
+ read, err = self.ReadAt(b, self.off)
+ // glog.V(logger.Detail).Infof("read: %v, off: %v, error: %v", read, self.off, err)
+
+ self.off += int64(read)
+ return
+}
+
+// completely analogous to standard SectionReader implementation
+var errWhence = errors.New("Seek: invalid whence")
+var errOffset = errors.New("Seek: invalid offset")
+
+func (s *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
+ switch whence {
+ default:
+ return 0, errWhence
+ case 0:
+ offset += 0
+ case 1:
+ offset += s.off
+ case 2:
+ if s.chunk == nil {
+ return 0, fmt.Errorf("seek from the end requires rootchunk for size. call Size first")
+ }
+ offset += s.chunk.Size
+ }
+
+ if offset < 0 {
+ return 0, errOffset
+ }
+ s.off = offset
+ return offset, nil
+}
diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go
new file mode 100644
index 000000000..e6ca3d087
--- /dev/null
+++ b/swarm/storage/chunker_test.go
@@ -0,0 +1,303 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+)
+
+/*
+Tests TreeChunker by splitting and joining a random byte slice
+*/
+
+type test interface {
+ Fatalf(string, ...interface{})
+ Logf(string, ...interface{})
+}
+
+type chunkerTester struct {
+ inputs map[uint64][]byte
+ chunks map[string]*Chunk
+ t test
+}
+
+func (self *chunkerTester) checkChunks(t *testing.T, want int) {
+ l := len(self.chunks)
+ if l != want {
+ t.Errorf("expected %v chunks, got %v", want, l)
+ }
+}
+
+func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup) (key Key) {
+ // reset
+ self.chunks = make(map[string]*Chunk)
+
+ if self.inputs == nil {
+ self.inputs = make(map[uint64][]byte)
+ }
+
+ quitC := make(chan bool)
+ timeout := time.After(600 * time.Second)
+ if chunkC != nil {
+ go func() {
+ for {
+ select {
+ case <-timeout:
+ self.t.Fatalf("Join timeout error")
+
+ case chunk, ok := <-chunkC:
+ if !ok {
+ // glog.V(logger.Info).Infof("chunkC closed quitting")
+ close(quitC)
+ return
+ }
+ // glog.V(logger.Info).Infof("chunk %v received", len(self.chunks))
+ // self.chunks = append(self.chunks, chunk)
+ self.chunks[chunk.Key.String()] = chunk
+ if chunk.wg != nil {
+ chunk.wg.Done()
+ }
+ }
+ }
+ }()
+ }
+ key, err := chunker.Split(data, size, chunkC, swg, nil)
+ if err != nil {
+ self.t.Fatalf("Split error: %v", err)
+ }
+ if chunkC != nil {
+ if swg != nil {
+ // glog.V(logger.Info).Infof("Waiting for storage to finish")
+ swg.Wait()
+ // glog.V(logger.Info).Infof("Storage finished")
+ }
+ close(chunkC)
+ }
+ if chunkC != nil {
+ // glog.V(logger.Info).Infof("waiting for splitter finished")
+ <-quitC
+ // glog.V(logger.Info).Infof("Splitter finished")
+ }
+ return
+}
+
+func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader {
+ // reset but not the chunks
+
+ // glog.V(logger.Info).Infof("Splitter finished")
+ reader := chunker.Join(key, chunkC)
+
+ timeout := time.After(600 * time.Second)
+ // glog.V(logger.Info).Infof("Splitter finished")
+ i := 0
+ go func() {
+ for {
+ select {
+ case <-timeout:
+ self.t.Fatalf("Join timeout error")
+
+ case chunk, ok := <-chunkC:
+ if !ok {
+ close(quitC)
+ return
+ }
+ // glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String())
+ // this just mocks the behaviour of a chunk store retrieval
+ stored, success := self.chunks[chunk.Key.String()]
+ // glog.V(logger.Info).Infof("chunk %v, success: %v", chunk.Key.String(), success)
+ if !success {
+ self.t.Fatalf("not found")
+ return
+ }
+ // glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String())
+ chunk.SData = stored.SData
+ chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+ close(chunk.C)
+ i++
+ }
+ }
+ }()
+ return reader
+}
+
+func testRandomData(splitter Splitter, n int, tester *chunkerTester) {
+ if tester.inputs == nil {
+ tester.inputs = make(map[uint64][]byte)
+ }
+ input, found := tester.inputs[uint64(n)]
+ var data io.Reader
+ if !found {
+ data, input = testDataReaderAndSlice(n)
+ tester.inputs[uint64(n)] = input
+ } else {
+ data = limitReader(bytes.NewReader(input), n)
+ }
+
+ chunkC := make(chan *Chunk, 1000)
+ swg := &sync.WaitGroup{}
+
+ key := tester.Split(splitter, data, int64(n), chunkC, swg)
+ tester.t.Logf(" Key = %v\n", key)
+
+ chunkC = make(chan *Chunk, 1000)
+ quitC := make(chan bool)
+
+ chunker := NewTreeChunker(NewChunkerParams())
+ reader := tester.Join(chunker, key, 0, chunkC, quitC)
+ output := make([]byte, n)
+ // glog.V(logger.Info).Infof(" Key = %v\n", key)
+ r, err := reader.Read(output)
+ // glog.V(logger.Info).Infof(" read = %v %v\n", r, err)
+ if r != n || err != io.EOF {
+ tester.t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err)
+ }
+ if input != nil {
+ if !bytes.Equal(output, input) {
+ tester.t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", input, output)
+ }
+ }
+ close(chunkC)
+ <-quitC
+}
+
+func TestRandomData(t *testing.T) {
+ // sizes := []int{123456}
+ sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 123456}
+ tester := &chunkerTester{t: t}
+ chunker := NewTreeChunker(NewChunkerParams())
+ for _, s := range sizes {
+ testRandomData(chunker, s, tester)
+ }
+ pyramid := NewPyramidChunker(NewChunkerParams())
+ for _, s := range sizes {
+ testRandomData(pyramid, s, tester)
+ }
+}
+
+func readAll(reader LazySectionReader, result []byte) {
+ size := int64(len(result))
+
+ var end int64
+ for pos := int64(0); pos < size; pos += 1000 {
+ if pos+1000 > size {
+ end = size
+ } else {
+ end = pos + 1000
+ }
+ reader.ReadAt(result[pos:end], pos)
+ }
+}
+
+func benchReadAll(reader LazySectionReader) {
+ size, _ := reader.Size(nil)
+ output := make([]byte, 1000)
+ for pos := int64(0); pos < size; pos += 1000 {
+ reader.ReadAt(output, pos)
+ }
+}
+
+func benchmarkJoin(n int, t *testing.B) {
+ t.ReportAllocs()
+ for i := 0; i < t.N; i++ {
+ chunker := NewTreeChunker(NewChunkerParams())
+ tester := &chunkerTester{t: t}
+ data := testDataReader(n)
+
+ chunkC := make(chan *Chunk, 1000)
+ swg := &sync.WaitGroup{}
+
+ key := tester.Split(chunker, data, int64(n), chunkC, swg)
+ // t.StartTimer()
+ chunkC = make(chan *Chunk, 1000)
+ quitC := make(chan bool)
+ reader := tester.Join(chunker, key, i, chunkC, quitC)
+ benchReadAll(reader)
+ close(chunkC)
+ <-quitC
+ // t.StopTimer()
+ }
+ stats := new(runtime.MemStats)
+ runtime.ReadMemStats(stats)
+ fmt.Println(stats.Sys)
+}
+
+func benchmarkSplitTree(n int, t *testing.B) {
+ t.ReportAllocs()
+ for i := 0; i < t.N; i++ {
+ chunker := NewTreeChunker(NewChunkerParams())
+ tester := &chunkerTester{t: t}
+ data := testDataReader(n)
+ // glog.V(logger.Info).Infof("splitting data of length %v", n)
+ tester.Split(chunker, data, int64(n), nil, nil)
+ }
+ stats := new(runtime.MemStats)
+ runtime.ReadMemStats(stats)
+ fmt.Println(stats.Sys)
+}
+
+func benchmarkSplitPyramid(n int, t *testing.B) {
+ t.ReportAllocs()
+ for i := 0; i < t.N; i++ {
+ splitter := NewPyramidChunker(NewChunkerParams())
+ tester := &chunkerTester{t: t}
+ data := testDataReader(n)
+ // glog.V(logger.Info).Infof("splitting data of length %v", n)
+ tester.Split(splitter, data, int64(n), nil, nil)
+ }
+ stats := new(runtime.MemStats)
+ runtime.ReadMemStats(stats)
+ fmt.Println(stats.Sys)
+}
+
+func BenchmarkJoin_2(t *testing.B) { benchmarkJoin(100, t) }
+func BenchmarkJoin_3(t *testing.B) { benchmarkJoin(1000, t) }
+func BenchmarkJoin_4(t *testing.B) { benchmarkJoin(10000, t) }
+func BenchmarkJoin_5(t *testing.B) { benchmarkJoin(100000, t) }
+func BenchmarkJoin_6(t *testing.B) { benchmarkJoin(1000000, t) }
+func BenchmarkJoin_7(t *testing.B) { benchmarkJoin(10000000, t) }
+func BenchmarkJoin_8(t *testing.B) { benchmarkJoin(100000000, t) }
+
+func BenchmarkSplitTree_2(t *testing.B) { benchmarkSplitTree(100, t) }
+func BenchmarkSplitTree_2h(t *testing.B) { benchmarkSplitTree(500, t) }
+func BenchmarkSplitTree_3(t *testing.B) { benchmarkSplitTree(1000, t) }
+func BenchmarkSplitTree_3h(t *testing.B) { benchmarkSplitTree(5000, t) }
+func BenchmarkSplitTree_4(t *testing.B) { benchmarkSplitTree(10000, t) }
+func BenchmarkSplitTree_4h(t *testing.B) { benchmarkSplitTree(50000, t) }
+func BenchmarkSplitTree_5(t *testing.B) { benchmarkSplitTree(100000, t) }
+func BenchmarkSplitTree_6(t *testing.B) { benchmarkSplitTree(1000000, t) }
+func BenchmarkSplitTree_7(t *testing.B) { benchmarkSplitTree(10000000, t) }
+func BenchmarkSplitTree_8(t *testing.B) { benchmarkSplitTree(100000000, t) }
+
+func BenchmarkSplitPyramid_2(t *testing.B) { benchmarkSplitPyramid(100, t) }
+func BenchmarkSplitPyramid_2h(t *testing.B) { benchmarkSplitPyramid(500, t) }
+func BenchmarkSplitPyramid_3(t *testing.B) { benchmarkSplitPyramid(1000, t) }
+func BenchmarkSplitPyramid_3h(t *testing.B) { benchmarkSplitPyramid(5000, t) }
+func BenchmarkSplitPyramid_4(t *testing.B) { benchmarkSplitPyramid(10000, t) }
+func BenchmarkSplitPyramid_4h(t *testing.B) { benchmarkSplitPyramid(50000, t) }
+func BenchmarkSplitPyramid_5(t *testing.B) { benchmarkSplitPyramid(100000, t) }
+func BenchmarkSplitPyramid_6(t *testing.B) { benchmarkSplitPyramid(1000000, t) }
+func BenchmarkSplitPyramid_7(t *testing.B) { benchmarkSplitPyramid(10000000, t) }
+func BenchmarkSplitPyramid_8(t *testing.B) { benchmarkSplitPyramid(100000000, t) }
+
+// godep go test -bench ./swarm/storage -cpuprofile cpu.out -memprofile mem.out
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
new file mode 100644
index 000000000..e81a82b7b
--- /dev/null
+++ b/swarm/storage/common_test.go
@@ -0,0 +1,117 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "bytes"
+ "crypto/rand"
+ "io"
+ "sync"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+type limitedReader struct {
+ r io.Reader
+ off int64
+ size int64
+}
+
+func limitReader(r io.Reader, size int) *limitedReader {
+ return &limitedReader{r, 0, int64(size)}
+}
+
+func (self *limitedReader) Read(buf []byte) (int, error) {
+ limit := int64(len(buf))
+ left := self.size - self.off
+ if limit >= left {
+ limit = left
+ }
+ n, err := self.r.Read(buf[:limit])
+ if err == nil && limit == left {
+ err = io.EOF
+ }
+ self.off += int64(n)
+ return n, err
+}
+
+func testDataReader(l int) (r io.Reader) {
+ return limitReader(rand.Reader, l)
+}
+
+func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) {
+ slice = make([]byte, l)
+ if _, err := rand.Read(slice); err != nil {
+ panic("rand error")
+ }
+ r = limitReader(bytes.NewReader(slice), l)
+ return
+}
+
+func testStore(m ChunkStore, l int64, branches int64, t *testing.T) {
+
+ chunkC := make(chan *Chunk)
+ go func() {
+ for chunk := range chunkC {
+ m.Put(chunk)
+ if chunk.wg != nil {
+ chunk.wg.Done()
+ }
+ }
+ }()
+ chunker := NewTreeChunker(&ChunkerParams{
+ Branches: branches,
+ Hash: defaultHash,
+ })
+ swg := &sync.WaitGroup{}
+ key, err := chunker.Split(rand.Reader, l, chunkC, swg, nil)
+ swg.Wait()
+ close(chunkC)
+ chunkC = make(chan *Chunk)
+
+ quit := make(chan bool)
+
+ go func() {
+ for ch := range chunkC {
+ go func(chunk *Chunk) {
+ storedChunk, err := m.Get(chunk.Key)
+ if err == notFound {
+ glog.V(logger.Detail).Infof("chunk '%v' not found", chunk.Key.Log())
+ } else if err != nil {
+ glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err)
+ } else {
+ chunk.SData = storedChunk.SData
+ chunk.Size = storedChunk.Size
+ }
+ glog.V(logger.Detail).Infof("chunk '%v' not found", chunk.Key.Log())
+ close(chunk.C)
+ }(ch)
+ }
+ close(quit)
+ }()
+ r := chunker.Join(key, chunkC)
+
+ b := make([]byte, l)
+ n, err := r.ReadAt(b, 0)
+ if err != io.EOF {
+ t.Fatalf("read error (%v/%v) %v", n, l, err)
+ }
+ close(chunkC)
+ <-quit
+}
diff --git a/swarm/storage/database.go b/swarm/storage/database.go
new file mode 100644
index 000000000..2532490cc
--- /dev/null
+++ b/swarm/storage/database.go
@@ -0,0 +1,99 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+// this is a clone of an earlier state of the ethereum ethdb/database
+// no need for queueing/caching
+
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/compression/rle"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+const openFileLimit = 128
+
+type LDBDatabase struct {
+ db *leveldb.DB
+ comp bool
+}
+
+func NewLDBDatabase(file string) (*LDBDatabase, error) {
+ // Open the db
+ db, err := leveldb.OpenFile(file, &opt.Options{OpenFilesCacheCapacity: openFileLimit})
+ if err != nil {
+ return nil, err
+ }
+
+ database := &LDBDatabase{db: db, comp: false}
+
+ return database, nil
+}
+
+func (self *LDBDatabase) Put(key []byte, value []byte) {
+ if self.comp {
+ value = rle.Compress(value)
+ }
+
+ err := self.db.Put(key, value, nil)
+ if err != nil {
+ fmt.Println("Error put", err)
+ }
+}
+
+func (self *LDBDatabase) Get(key []byte) ([]byte, error) {
+ dat, err := self.db.Get(key, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if self.comp {
+ return rle.Decompress(dat)
+ }
+
+ return dat, nil
+}
+
+func (self *LDBDatabase) Delete(key []byte) error {
+ return self.db.Delete(key, nil)
+}
+
+func (self *LDBDatabase) LastKnownTD() []byte {
+ data, _ := self.Get([]byte("LTD"))
+
+ if len(data) == 0 {
+ data = []byte{0x0}
+ }
+
+ return data
+}
+
+func (self *LDBDatabase) NewIterator() iterator.Iterator {
+ return self.db.NewIterator(nil, nil)
+}
+
+func (self *LDBDatabase) Write(batch *leveldb.Batch) error {
+ return self.db.Write(batch, nil)
+}
+
+func (self *LDBDatabase) Close() {
+ // Close the leveldb database
+ self.db.Close()
+}
diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go
new file mode 100644
index 000000000..5ecc5c500
--- /dev/null
+++ b/swarm/storage/dbstore.go
@@ -0,0 +1,473 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// disk storage layer for the package bzz
+// DbStore implements the ChunkStore interface and is used by the DPA as
+// persistent storage of chunks
+// it implements purging based on access count allowing for external control of
+// max capacity
+
+package storage
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+const (
+ defaultDbCapacity = 5000000
+ defaultRadius = 0 // not yet used
+
+ gcArraySize = 10000
+ gcArrayFreeRatio = 0.1
+
+ // key prefixes for leveldb storage
+ kpIndex = 0
+ kpData = 1
+)
+
+var (
+ keyAccessCnt = []byte{2}
+ keyEntryCnt = []byte{3}
+ keyDataIdx = []byte{4}
+ keyGCPos = []byte{5}
+)
+
+type gcItem struct {
+ idx uint64
+ value uint64
+ idxKey []byte
+}
+
+type DbStore struct {
+ db *LDBDatabase
+
+ // this should be stored in db, accessed transactionally
+ entryCnt, accessCnt, dataIdx, capacity uint64
+
+ gcPos, gcStartPos []byte
+ gcArray []*gcItem
+
+ hashfunc Hasher
+
+ lock sync.Mutex
+}
+
+func NewDbStore(path string, hash Hasher, capacity uint64, radius int) (s *DbStore, err error) {
+ s = new(DbStore)
+
+ s.hashfunc = hash
+
+ s.db, err = NewLDBDatabase(path)
+ if err != nil {
+ return
+ }
+
+ s.setCapacity(capacity)
+
+ s.gcStartPos = make([]byte, 1)
+ s.gcStartPos[0] = kpIndex
+ s.gcArray = make([]*gcItem, gcArraySize)
+
+ data, _ := s.db.Get(keyEntryCnt)
+ s.entryCnt = BytesToU64(data)
+ data, _ = s.db.Get(keyAccessCnt)
+ s.accessCnt = BytesToU64(data)
+ data, _ = s.db.Get(keyDataIdx)
+ s.dataIdx = BytesToU64(data)
+ s.gcPos, _ = s.db.Get(keyGCPos)
+ if s.gcPos == nil {
+ s.gcPos = s.gcStartPos
+ }
+ return
+}
+
+type dpaDBIndex struct {
+ Idx uint64
+ Access uint64
+}
+
+func BytesToU64(data []byte) uint64 {
+ if len(data) < 8 {
+ return 0
+ }
+ return binary.LittleEndian.Uint64(data)
+}
+
+func U64ToBytes(val uint64) []byte {
+ data := make([]byte, 8)
+ binary.LittleEndian.PutUint64(data, val)
+ return data
+}
+
+func getIndexGCValue(index *dpaDBIndex) uint64 {
+ return index.Access
+}
+
+func (s *DbStore) updateIndexAccess(index *dpaDBIndex) {
+ index.Access = s.accessCnt
+}
+
+func getIndexKey(hash Key) []byte {
+ HashSize := len(hash)
+ key := make([]byte, HashSize+1)
+ key[0] = 0
+ copy(key[1:], hash[:])
+ return key
+}
+
+func getDataKey(idx uint64) []byte {
+ key := make([]byte, 9)
+ key[0] = 1
+ binary.BigEndian.PutUint64(key[1:9], idx)
+
+ return key
+}
+
+func encodeIndex(index *dpaDBIndex) []byte {
+ data, _ := rlp.EncodeToBytes(index)
+ return data
+}
+
+func encodeData(chunk *Chunk) []byte {
+ return chunk.SData
+}
+
+func decodeIndex(data []byte, index *dpaDBIndex) {
+ dec := rlp.NewStream(bytes.NewReader(data), 0)
+ dec.Decode(index)
+}
+
+func decodeData(data []byte, chunk *Chunk) {
+ chunk.SData = data
+ chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8]))
+}
+
+func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int {
+ pivotValue := list[pivotIndex].value
+ dd := list[pivotIndex]
+ list[pivotIndex] = list[right]
+ list[right] = dd
+ storeIndex := left
+ for i := left; i < right; i++ {
+ if list[i].value < pivotValue {
+ dd = list[storeIndex]
+ list[storeIndex] = list[i]
+ list[i] = dd
+ storeIndex++
+ }
+ }
+ dd = list[storeIndex]
+ list[storeIndex] = list[right]
+ list[right] = dd
+ return storeIndex
+}
+
+func gcListSelect(list []*gcItem, left int, right int, n int) int {
+ if left == right {
+ return left
+ }
+ pivotIndex := (left + right) / 2
+ pivotIndex = gcListPartition(list, left, right, pivotIndex)
+ if n == pivotIndex {
+ return n
+ } else {
+ if n < pivotIndex {
+ return gcListSelect(list, left, pivotIndex-1, n)
+ } else {
+ return gcListSelect(list, pivotIndex+1, right, n)
+ }
+ }
+}
+
+func (s *DbStore) collectGarbage(ratio float32) {
+ it := s.db.NewIterator()
+ it.Seek(s.gcPos)
+ if it.Valid() {
+ s.gcPos = it.Key()
+ } else {
+ s.gcPos = nil
+ }
+ gcnt := 0
+
+ for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) {
+
+ if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
+ it.Seek(s.gcStartPos)
+ if it.Valid() {
+ s.gcPos = it.Key()
+ } else {
+ s.gcPos = nil
+ }
+ }
+
+ if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
+ break
+ }
+
+ gci := new(gcItem)
+ gci.idxKey = s.gcPos
+ var index dpaDBIndex
+ decodeIndex(it.Value(), &index)
+ gci.idx = index.Idx
+ // the smaller, the more likely to be gc'd
+ gci.value = getIndexGCValue(&index)
+ s.gcArray[gcnt] = gci
+ gcnt++
+ it.Next()
+ if it.Valid() {
+ s.gcPos = it.Key()
+ } else {
+ s.gcPos = nil
+ }
+ }
+ it.Release()
+
+ cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio))
+ cutval := s.gcArray[cutidx].value
+
+ // fmt.Print(gcnt, " ", s.entryCnt, " ")
+
+ // actual gc
+ for i := 0; i < gcnt; i++ {
+ if s.gcArray[i].value <= cutval {
+ batch := new(leveldb.Batch)
+ batch.Delete(s.gcArray[i].idxKey)
+ batch.Delete(getDataKey(s.gcArray[i].idx))
+ s.entryCnt--
+ batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+ s.db.Write(batch)
+ }
+ }
+
+ // fmt.Println(s.entryCnt)
+
+ s.db.Put(keyGCPos, s.gcPos)
+}
+
+func (s *DbStore) Counter() uint64 {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ return s.dataIdx
+}
+
+func (s *DbStore) Put(chunk *Chunk) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ ikey := getIndexKey(chunk.Key)
+ var index dpaDBIndex
+
+ if s.tryAccessIdx(ikey, &index) {
+ if chunk.dbStored != nil {
+ close(chunk.dbStored)
+ }
+ return // already exists, only update access
+ }
+
+ data := encodeData(chunk)
+ //data := ethutil.Encode([]interface{}{entry})
+
+ if s.entryCnt >= s.capacity {
+ s.collectGarbage(gcArrayFreeRatio)
+ }
+
+ batch := new(leveldb.Batch)
+
+ batch.Put(getDataKey(s.dataIdx), data)
+
+ index.Idx = s.dataIdx
+ s.updateIndexAccess(&index)
+
+ idata := encodeIndex(&index)
+ batch.Put(ikey, idata)
+
+ batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
+ s.entryCnt++
+ batch.Put(keyDataIdx, U64ToBytes(s.dataIdx))
+ s.dataIdx++
+ batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
+ s.accessCnt++
+
+ s.db.Write(batch)
+ if chunk.dbStored != nil {
+ close(chunk.dbStored)
+ }
+ glog.V(logger.Detail).Infof("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx)
+}
+
+// try to find index; if found, update access cnt and return true
+func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
+ idata, err := s.db.Get(ikey)
+ if err != nil {
+ return false
+ }
+ decodeIndex(idata, index)
+
+ batch := new(leveldb.Batch)
+
+ batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
+ s.accessCnt++
+ s.updateIndexAccess(index)
+ idata = encodeIndex(index)
+ batch.Put(ikey, idata)
+
+ s.db.Write(batch)
+
+ return true
+}
+
+func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ var index dpaDBIndex
+
+ if s.tryAccessIdx(getIndexKey(key), &index) {
+ var data []byte
+ data, err = s.db.Get(getDataKey(index.Idx))
+ if err != nil {
+ return
+ }
+
+ hasher := s.hashfunc()
+ hasher.Write(data)
+ hash := hasher.Sum(nil)
+ if bytes.Compare(hash, key) != 0 {
+ s.db.Delete(getDataKey(index.Idx))
+ err = fmt.Errorf("invalid chunk. hash=%x, key=%v", hash, key[:])
+ return
+ }
+
+ chunk = &Chunk{
+ Key: key,
+ }
+ decodeData(data, chunk)
+ } else {
+ err = notFound
+ }
+
+ return
+
+}
+
+func (s *DbStore) updateAccessCnt(key Key) {
+
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ var index dpaDBIndex
+ s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt
+
+}
+
+func (s *DbStore) setCapacity(c uint64) {
+
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.capacity = c
+
+ if s.entryCnt > c {
+ var ratio float32
+ ratio = float32(1.01) - float32(c)/float32(s.entryCnt)
+ if ratio < gcArrayFreeRatio {
+ ratio = gcArrayFreeRatio
+ }
+ if ratio > 1 {
+ ratio = 1
+ }
+ for s.entryCnt > c {
+ s.collectGarbage(ratio)
+ }
+ }
+}
+
+func (s *DbStore) getEntryCnt() uint64 {
+ return s.entryCnt
+}
+
+func (s *DbStore) close() {
+ s.db.Close()
+}
+
+// describes a section of the DbStore representing the unsynced
+// domain relevant to a peer
+// Start - Stop designate a continuous area Keys in an address space
+// typically the addresses closer to us than to the peer but not closer
+// another closer peer in between
+// From - To designates a time interval typically from the last disconnect
+// till the latest connection (real time traffic is relayed)
+type DbSyncState struct {
+ Start, Stop Key
+ First, Last uint64
+}
+
+// implements the syncer iterator interface
+// iterates by storage index (~ time of storage = first entry to db)
+type dbSyncIterator struct {
+ it iterator.Iterator
+ DbSyncState
+}
+
+// initialises a sync iterator from a syncToken (passed in with the handshake)
+func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) {
+ if state.First > state.Last {
+ return nil, fmt.Errorf("no entries found")
+ }
+ si = &dbSyncIterator{
+ it: self.db.NewIterator(),
+ DbSyncState: state,
+ }
+ si.it.Seek(getIndexKey(state.Start))
+ return si, nil
+}
+
+// walk the area from Start to Stop and returns items within time interval
+// First to Last
+func (self *dbSyncIterator) Next() (key Key) {
+ for self.it.Valid() {
+ dbkey := self.it.Key()
+ if dbkey[0] != 0 {
+ break
+ }
+ key = Key(make([]byte, len(dbkey)-1))
+ copy(key[:], dbkey[1:])
+ if bytes.Compare(key[:], self.Start) <= 0 {
+ self.it.Next()
+ continue
+ }
+ if bytes.Compare(key[:], self.Stop) > 0 {
+ break
+ }
+ var index dpaDBIndex
+ decodeIndex(self.it.Value(), &index)
+ self.it.Next()
+ if (index.Idx >= self.First) && (index.Idx < self.Last) {
+ return
+ }
+ }
+ self.it.Release()
+ return nil
+}
diff --git a/swarm/storage/dbstore_test.go b/swarm/storage/dbstore_test.go
new file mode 100644
index 000000000..e2f36a6bc
--- /dev/null
+++ b/swarm/storage/dbstore_test.go
@@ -0,0 +1,191 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "bytes"
+ "io/ioutil"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+)
+
+func initDbStore(t *testing.T) *DbStore {
+ dir, err := ioutil.TempDir("", "bzz-storage-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ m, err := NewDbStore(dir, MakeHashFunc(defaultHash), defaultDbCapacity, defaultRadius)
+ if err != nil {
+ t.Fatal("can't create store:", err)
+ }
+ return m
+}
+
+func testDbStore(l int64, branches int64, t *testing.T) {
+ m := initDbStore(t)
+ defer m.close()
+ testStore(m, l, branches, t)
+}
+
+func TestDbStore128_0x1000000(t *testing.T) {
+ testDbStore(0x1000000, 128, t)
+}
+
+func TestDbStore128_10000_(t *testing.T) {
+ testDbStore(10000, 128, t)
+}
+
+func TestDbStore128_1000_(t *testing.T) {
+ testDbStore(1000, 128, t)
+}
+
+func TestDbStore128_100_(t *testing.T) {
+ testDbStore(100, 128, t)
+}
+
+func TestDbStore2_100_(t *testing.T) {
+ testDbStore(100, 2, t)
+}
+
+func TestDbStoreNotFound(t *testing.T) {
+ m := initDbStore(t)
+ defer m.close()
+ _, err := m.Get(ZeroKey)
+ if err != notFound {
+ t.Errorf("Expected notFound, got %v", err)
+ }
+}
+
+func TestDbStoreSyncIterator(t *testing.T) {
+ m := initDbStore(t)
+ defer m.close()
+ keys := []Key{
+ Key(common.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000000000")),
+ Key(common.Hex2Bytes("4000000000000000000000000000000000000000000000000000000000000000")),
+ Key(common.Hex2Bytes("5000000000000000000000000000000000000000000000000000000000000000")),
+ Key(common.Hex2Bytes("3000000000000000000000000000000000000000000000000000000000000000")),
+ Key(common.Hex2Bytes("2000000000000000000000000000000000000000000000000000000000000000")),
+ Key(common.Hex2Bytes("1000000000000000000000000000000000000000000000000000000000000000")),
+ }
+ for _, key := range keys {
+ m.Put(NewChunk(key, nil))
+ }
+ it, err := m.NewSyncIterator(DbSyncState{
+ Start: Key(common.Hex2Bytes("1000000000000000000000000000000000000000000000000000000000000000")),
+ Stop: Key(common.Hex2Bytes("4000000000000000000000000000000000000000000000000000000000000000")),
+ First: 2,
+ Last: 4,
+ })
+ if err != nil {
+ t.Fatalf("unexpected error creating NewSyncIterator")
+ }
+
+ var chunk Key
+ var res []Key
+ for {
+ chunk = it.Next()
+ if chunk == nil {
+ break
+ }
+ res = append(res, chunk)
+ }
+ if len(res) != 1 {
+ t.Fatalf("Expected 1 chunk, got %v: %v", len(res), res)
+ }
+ if !bytes.Equal(res[0][:], keys[3]) {
+ t.Fatalf("Expected %v chunk, got %v", keys[3], res[0])
+ }
+
+ if err != nil {
+ t.Fatalf("unexpected error creating NewSyncIterator")
+ }
+
+ it, err = m.NewSyncIterator(DbSyncState{
+ Start: Key(common.Hex2Bytes("1000000000000000000000000000000000000000000000000000000000000000")),
+ Stop: Key(common.Hex2Bytes("5000000000000000000000000000000000000000000000000000000000000000")),
+ First: 2,
+ Last: 4,
+ })
+
+ res = nil
+ for {
+ chunk = it.Next()
+ if chunk == nil {
+ break
+ }
+ res = append(res, chunk)
+ }
+ if len(res) != 2 {
+ t.Fatalf("Expected 2 chunk, got %v: %v", len(res), res)
+ }
+ if !bytes.Equal(res[0][:], keys[3]) {
+ t.Fatalf("Expected %v chunk, got %v", keys[3], res[0])
+ }
+ if !bytes.Equal(res[1][:], keys[2]) {
+ t.Fatalf("Expected %v chunk, got %v", keys[2], res[1])
+ }
+
+ if err != nil {
+ t.Fatalf("unexpected error creating NewSyncIterator")
+ }
+
+ it, err = m.NewSyncIterator(DbSyncState{
+ Start: Key(common.Hex2Bytes("1000000000000000000000000000000000000000000000000000000000000000")),
+ Stop: Key(common.Hex2Bytes("4000000000000000000000000000000000000000000000000000000000000000")),
+ First: 2,
+ Last: 5,
+ })
+ res = nil
+ for {
+ chunk = it.Next()
+ if chunk == nil {
+ break
+ }
+ res = append(res, chunk)
+ }
+ if len(res) != 2 {
+ t.Fatalf("Expected 2 chunk, got %v", len(res))
+ }
+ if !bytes.Equal(res[0][:], keys[4]) {
+ t.Fatalf("Expected %v chunk, got %v", keys[4], res[0])
+ }
+ if !bytes.Equal(res[1][:], keys[3]) {
+ t.Fatalf("Expected %v chunk, got %v", keys[3], res[1])
+ }
+
+ it, err = m.NewSyncIterator(DbSyncState{
+ Start: Key(common.Hex2Bytes("2000000000000000000000000000000000000000000000000000000000000000")),
+ Stop: Key(common.Hex2Bytes("4000000000000000000000000000000000000000000000000000000000000000")),
+ First: 2,
+ Last: 5,
+ })
+ res = nil
+ for {
+ chunk = it.Next()
+ if chunk == nil {
+ break
+ }
+ res = append(res, chunk)
+ }
+ if len(res) != 1 {
+ t.Fatalf("Expected 1 chunk, got %v", len(res))
+ }
+ if !bytes.Equal(res[0][:], keys[3]) {
+ t.Fatalf("Expected %v chunk, got %v", keys[3], res[0])
+ }
+}
diff --git a/swarm/storage/dpa.go b/swarm/storage/dpa.go
new file mode 100644
index 000000000..31b6c54ac
--- /dev/null
+++ b/swarm/storage/dpa.go
@@ -0,0 +1,239 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "errors"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+/*
+DPA provides the client API entrypoints Store and Retrieve to store and retrieve
+It can store anything that has a byte slice representation, so files or serialised objects etc.
+
+Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client.
+
+Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read.
+
+As the chunker produces chunks, DPA dispatches them to its own chunk store
+implementation for storage or retrieval.
+*/
+
+const (
+ storeChanCapacity = 100
+ retrieveChanCapacity = 100
+ singletonSwarmDbCapacity = 50000
+ singletonSwarmCacheCapacity = 500
+ maxStoreProcesses = 8
+ maxRetrieveProcesses = 8
+)
+
+var (
+ notFound = errors.New("not found")
+)
+
+type DPA struct {
+ ChunkStore
+ storeC chan *Chunk
+ retrieveC chan *Chunk
+ Chunker Chunker
+
+ lock sync.Mutex
+ running bool
+ wg *sync.WaitGroup
+ quitC chan bool
+}
+
+// for testing locally
+func NewLocalDPA(datadir string) (*DPA, error) {
+
+ hash := MakeHashFunc("SHA256")
+
+ dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewDPA(&LocalStore{
+ NewMemStore(dbStore, singletonSwarmCacheCapacity),
+ dbStore,
+ }, NewChunkerParams()), nil
+}
+
+func NewDPA(store ChunkStore, params *ChunkerParams) *DPA {
+ chunker := NewTreeChunker(params)
+ return &DPA{
+ Chunker: chunker,
+ ChunkStore: store,
+ }
+}
+
+// Public API. Main entry point for document retrieval directly. Used by the
+// FS-aware API and httpaccess
+// Chunk retrieval blocks on netStore requests with a timeout so reader will
+// report error if retrieval of chunks within requested range time out.
+func (self *DPA) Retrieve(key Key) LazySectionReader {
+ return self.Chunker.Join(key, self.retrieveC)
+}
+
+// Public API. Main entry point for document storage directly. Used by the
+// FS-aware API and httpaccess
+func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) {
+ return self.Chunker.Split(data, size, self.storeC, swg, wwg)
+}
+
+func (self *DPA) Start() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.running {
+ return
+ }
+ self.running = true
+ self.retrieveC = make(chan *Chunk, retrieveChanCapacity)
+ self.storeC = make(chan *Chunk, storeChanCapacity)
+ self.quitC = make(chan bool)
+ self.storeLoop()
+ self.retrieveLoop()
+}
+
+func (self *DPA) Stop() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if !self.running {
+ return
+ }
+ self.running = false
+ close(self.quitC)
+}
+
+// retrieveLoop dispatches the parallel chunk retrieval requests received on the
+// retrieve channel to its ChunkStore (NetStore or LocalStore)
+func (self *DPA) retrieveLoop() {
+ for i := 0; i < maxRetrieveProcesses; i++ {
+ go self.retrieveWorker()
+ }
+ glog.V(logger.Detail).Infof("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses)
+}
+
+func (self *DPA) retrieveWorker() {
+ for chunk := range self.retrieveC {
+ glog.V(logger.Detail).Infof("dpa: retrieve loop : chunk %v", chunk.Key.Log())
+ storedChunk, err := self.Get(chunk.Key)
+ if err == notFound {
+ glog.V(logger.Detail).Infof("chunk %v not found", chunk.Key.Log())
+ } else if err != nil {
+ glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err)
+ } else {
+ chunk.SData = storedChunk.SData
+ chunk.Size = storedChunk.Size
+ }
+ close(chunk.C)
+
+ select {
+ case <-self.quitC:
+ return
+ default:
+ }
+ }
+}
+
+// storeLoop dispatches the parallel chunk store request processors
+// received on the store channel to its ChunkStore (NetStore or LocalStore)
+func (self *DPA) storeLoop() {
+ for i := 0; i < maxStoreProcesses; i++ {
+ go self.storeWorker()
+ }
+ glog.V(logger.Detail).Infof("dpa: store spawning %v workers", maxStoreProcesses)
+}
+
+func (self *DPA) storeWorker() {
+
+ for chunk := range self.storeC {
+ self.Put(chunk)
+ if chunk.wg != nil {
+ glog.V(logger.Detail).Infof("dpa: store processor %v", chunk.Key.Log())
+ chunk.wg.Done()
+
+ }
+ select {
+ case <-self.quitC:
+ return
+ default:
+ }
+ }
+}
+
+// DpaChunkStore implements the ChunkStore interface,
+// this chunk access layer assumed 2 chunk stores
+// local storage eg. LocalStore and network storage eg., NetStore
+// access by calling network is blocking with a timeout
+
+type dpaChunkStore struct {
+ n int
+ localStore ChunkStore
+ netStore ChunkStore
+}
+
+func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore {
+ return &dpaChunkStore{0, localStore, netStore}
+}
+
+// Get is the entrypoint for local retrieve requests
+// waits for response or times out
+func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
+ chunk, err = self.netStore.Get(key)
+ // timeout := time.Now().Add(searchTimeout)
+ if chunk.SData != nil {
+ glog.V(logger.Detail).Infof("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData))
+ return
+ }
+ // TODO: use self.timer time.Timer and reset with defer disableTimer
+ timer := time.After(searchTimeout)
+ select {
+ case <-timer:
+ glog.V(logger.Detail).Infof("DPA.Get: %v request time out ", key.Log())
+ err = notFound
+ case <-chunk.Req.C:
+ glog.V(logger.Detail).Infof("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk)
+ }
+ return
+}
+
+// Put is the entrypoint for local store requests coming from storeLoop
+func (self *dpaChunkStore) Put(entry *Chunk) {
+ chunk, err := self.localStore.Get(entry.Key)
+ if err != nil {
+ glog.V(logger.Detail).Infof("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log())
+ chunk = entry
+ } else if chunk.SData == nil {
+ glog.V(logger.Detail).Infof("DPA.Put: %v request entry found", entry.Key.Log())
+ chunk.SData = entry.SData
+ chunk.Size = entry.Size
+ } else {
+ glog.V(logger.Detail).Infof("DPA.Put: %v chunk already known", entry.Key.Log())
+ return
+ }
+ // from this point on the storage logic is the same with network storage requests
+ glog.V(logger.Detail).Infof("DPA.Put %v: %v", self.n, chunk.Key.Log())
+ self.n++
+ self.netStore.Put(chunk)
+}
diff --git a/swarm/storage/dpa_test.go b/swarm/storage/dpa_test.go
new file mode 100644
index 000000000..1cde1c00e
--- /dev/null
+++ b/swarm/storage/dpa_test.go
@@ -0,0 +1,144 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "bytes"
+ "io"
+ "io/ioutil"
+ "os"
+ "sync"
+ "testing"
+)
+
+const testDataSize = 0x1000000
+
+func TestDPArandom(t *testing.T) {
+ dbStore := initDbStore(t)
+ dbStore.setCapacity(50000)
+ memStore := NewMemStore(dbStore, defaultCacheCapacity)
+ localStore := &LocalStore{
+ memStore,
+ dbStore,
+ }
+ chunker := NewTreeChunker(NewChunkerParams())
+ dpa := &DPA{
+ Chunker: chunker,
+ ChunkStore: localStore,
+ }
+ dpa.Start()
+ defer dpa.Stop()
+ defer os.RemoveAll("/tmp/bzz")
+
+ reader, slice := testDataReaderAndSlice(testDataSize)
+ wg := &sync.WaitGroup{}
+ key, err := dpa.Store(reader, testDataSize, wg, nil)
+ if err != nil {
+ t.Errorf("Store error: %v", err)
+ }
+ wg.Wait()
+ resultReader := dpa.Retrieve(key)
+ resultSlice := make([]byte, len(slice))
+ n, err := resultReader.ReadAt(resultSlice, 0)
+ if err != io.EOF {
+ t.Errorf("Retrieve error: %v", err)
+ }
+ if n != len(slice) {
+ t.Errorf("Slice size error got %d, expected %d.", n, len(slice))
+ }
+ if !bytes.Equal(slice, resultSlice) {
+ t.Errorf("Comparison error.")
+ }
+ ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666)
+ ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666)
+ localStore.memStore = NewMemStore(dbStore, defaultCacheCapacity)
+ resultReader = dpa.Retrieve(key)
+ for i, _ := range resultSlice {
+ resultSlice[i] = 0
+ }
+ n, err = resultReader.ReadAt(resultSlice, 0)
+ if err != io.EOF {
+ t.Errorf("Retrieve error after removing memStore: %v", err)
+ }
+ if n != len(slice) {
+ t.Errorf("Slice size error after removing memStore got %d, expected %d.", n, len(slice))
+ }
+ if !bytes.Equal(slice, resultSlice) {
+ t.Errorf("Comparison error after removing memStore.")
+ }
+}
+
+func TestDPA_capacity(t *testing.T) {
+ dbStore := initDbStore(t)
+ memStore := NewMemStore(dbStore, defaultCacheCapacity)
+ localStore := &LocalStore{
+ memStore,
+ dbStore,
+ }
+ memStore.setCapacity(0)
+ chunker := NewTreeChunker(NewChunkerParams())
+ dpa := &DPA{
+ Chunker: chunker,
+ ChunkStore: localStore,
+ }
+ dpa.Start()
+ reader, slice := testDataReaderAndSlice(testDataSize)
+ wg := &sync.WaitGroup{}
+ key, err := dpa.Store(reader, testDataSize, wg, nil)
+ if err != nil {
+ t.Errorf("Store error: %v", err)
+ }
+ wg.Wait()
+ resultReader := dpa.Retrieve(key)
+ resultSlice := make([]byte, len(slice))
+ n, err := resultReader.ReadAt(resultSlice, 0)
+ if err != io.EOF {
+ t.Errorf("Retrieve error: %v", err)
+ }
+ if n != len(slice) {
+ t.Errorf("Slice size error got %d, expected %d.", n, len(slice))
+ }
+ if !bytes.Equal(slice, resultSlice) {
+ t.Errorf("Comparison error.")
+ }
+ // Clear memStore
+ memStore.setCapacity(0)
+ // check whether it is, indeed, empty
+ dpa.ChunkStore = memStore
+ resultReader = dpa.Retrieve(key)
+ n, err = resultReader.ReadAt(resultSlice, 0)
+ if err == nil {
+ t.Errorf("Was able to read %d bytes from an empty memStore.", len(slice))
+ }
+ // check how it works with localStore
+ dpa.ChunkStore = localStore
+ // localStore.dbStore.setCapacity(0)
+ resultReader = dpa.Retrieve(key)
+ for i, _ := range resultSlice {
+ resultSlice[i] = 0
+ }
+ n, err = resultReader.ReadAt(resultSlice, 0)
+ if err != io.EOF {
+ t.Errorf("Retrieve error after clearing memStore: %v", err)
+ }
+ if n != len(slice) {
+ t.Errorf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice))
+ }
+ if !bytes.Equal(slice, resultSlice) {
+ t.Errorf("Comparison error after clearing memStore.")
+ }
+}
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
new file mode 100644
index 000000000..541462f0c
--- /dev/null
+++ b/swarm/storage/localstore.go
@@ -0,0 +1,74 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "encoding/binary"
+)
+
+// LocalStore is a combination of inmemory db over a disk persisted db
+// implements a Get/Put with fallback (caching) logic using any 2 ChunkStores
+type LocalStore struct {
+ memStore ChunkStore
+ DbStore ChunkStore
+}
+
+// This constructor uses MemStore and DbStore as components
+func NewLocalStore(hash Hasher, params *StoreParams) (*LocalStore, error) {
+ dbStore, err := NewDbStore(params.ChunkDbPath, hash, params.DbCapacity, params.Radius)
+ if err != nil {
+ return nil, err
+ }
+ return &LocalStore{
+ memStore: NewMemStore(dbStore, params.CacheCapacity),
+ DbStore: dbStore,
+ }, nil
+}
+
+// LocalStore is itself a chunk store
+// unsafe, in that the data is not integrity checked
+func (self *LocalStore) Put(chunk *Chunk) {
+ chunk.dbStored = make(chan bool)
+ self.memStore.Put(chunk)
+ if chunk.wg != nil {
+ chunk.wg.Add(1)
+ }
+ go func() {
+ self.DbStore.Put(chunk)
+ if chunk.wg != nil {
+ chunk.wg.Done()
+ }
+ }()
+}
+
+// Get(chunk *Chunk) looks up a chunk in the local stores
+// This method is blocking until the chunk is retrieved
+// so additional timeout may be needed to wrap this call if
+// ChunkStores are remote and can have long latency
+func (self *LocalStore) Get(key Key) (chunk *Chunk, err error) {
+ chunk, err = self.memStore.Get(key)
+ if err == nil {
+ return
+ }
+ chunk, err = self.DbStore.Get(key)
+ if err != nil {
+ return
+ }
+ chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+ self.memStore.Put(chunk)
+ return
+}
diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go
new file mode 100644
index 000000000..f133bd7d3
--- /dev/null
+++ b/swarm/storage/memstore.go
@@ -0,0 +1,316 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// memory storage layer for the package blockhash
+
+package storage
+
+import (
+ "sync"
+)
+
+const (
+ memTreeLW = 2 // log2(subtree count) of the subtrees
+ memTreeFLW = 14 // log2(subtree count) of the root layer
+ dbForceUpdateAccessCnt = 1000
+ defaultCacheCapacity = 5000
+)
+
+type MemStore struct {
+ memtree *memTree
+ entryCnt, capacity uint // stored entries
+ accessCnt uint64 // access counter; oldest is thrown away when full
+ dbAccessCnt uint64
+ dbStore *DbStore
+ lock sync.Mutex
+}
+
+/*
+a hash prefix subtree containing subtrees or one storage entry (but never both)
+
+- access[0] stores the smallest (oldest) access count value in this subtree
+- if it contains more subtrees and its subtree count is at least 4, access[1:2]
+ stores the smallest access count in the first and second halves of subtrees
+ (so that access[0] = min(access[1], access[2])
+- likewise, if subtree count is at least 8,
+ access[1] = min(access[3], access[4])
+ access[2] = min(access[5], access[6])
+ (access[] is a binary tree inside the multi-bit leveled hash tree)
+*/
+
+func NewMemStore(d *DbStore, capacity uint) (m *MemStore) {
+ m = &MemStore{}
+ m.memtree = newMemTree(memTreeFLW, nil, 0)
+ m.dbStore = d
+ m.setCapacity(capacity)
+ return
+}
+
+type memTree struct {
+ subtree []*memTree
+ parent *memTree
+ parentIdx uint
+
+ bits uint // log2(subtree count)
+ width uint // subtree count
+
+ entry *Chunk // if subtrees are present, entry should be nil
+ lastDBaccess uint64
+ access []uint64
+}
+
+func newMemTree(b uint, parent *memTree, pidx uint) (node *memTree) {
+ node = new(memTree)
+ node.bits = b
+ node.width = 1 << uint(b)
+ node.subtree = make([]*memTree, node.width)
+ node.access = make([]uint64, node.width-1)
+ node.parent = parent
+ node.parentIdx = pidx
+ if parent != nil {
+ parent.subtree[pidx] = node
+ }
+
+ return node
+}
+
+func (node *memTree) updateAccess(a uint64) {
+ aidx := uint(0)
+ var aa uint64
+ oa := node.access[0]
+ for node.access[aidx] == oa {
+ node.access[aidx] = a
+ if aidx > 0 {
+ aa = node.access[((aidx-1)^1)+1]
+ aidx = (aidx - 1) >> 1
+ } else {
+ pidx := node.parentIdx
+ node = node.parent
+ if node == nil {
+ return
+ }
+ nn := node.subtree[pidx^1]
+ if nn != nil {
+ aa = nn.access[0]
+ } else {
+ aa = 0
+ }
+ aidx = (node.width + pidx - 2) >> 1
+ }
+
+ if (aa != 0) && (aa < a) {
+ a = aa
+ }
+ }
+}
+
+func (s *MemStore) setCapacity(c uint) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for c < s.entryCnt {
+ s.removeOldest()
+ }
+ s.capacity = c
+}
+
+func (s *MemStore) getEntryCnt() uint {
+ return s.entryCnt
+}
+
+// entry (not its copy) is going to be in MemStore
+func (s *MemStore) Put(entry *Chunk) {
+ if s.capacity == 0 {
+ return
+ }
+
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.entryCnt >= s.capacity {
+ s.removeOldest()
+ }
+
+ s.accessCnt++
+
+ node := s.memtree
+ bitpos := uint(0)
+ for node.entry == nil {
+ l := entry.Key.bits(bitpos, node.bits)
+ st := node.subtree[l]
+ if st == nil {
+ st = newMemTree(memTreeLW, node, l)
+ bitpos += node.bits
+ node = st
+ break
+ }
+ bitpos += node.bits
+ node = st
+ }
+
+ if node.entry != nil {
+
+ if node.entry.Key.isEqual(entry.Key) {
+ node.updateAccess(s.accessCnt)
+ if entry.SData == nil {
+ entry.Size = node.entry.Size
+ entry.SData = node.entry.SData
+ }
+ if entry.Req == nil {
+ entry.Req = node.entry.Req
+ }
+ entry.C = node.entry.C
+ node.entry = entry
+ return
+ }
+
+ for node.entry != nil {
+
+ l := node.entry.Key.bits(bitpos, node.bits)
+ st := node.subtree[l]
+ if st == nil {
+ st = newMemTree(memTreeLW, node, l)
+ }
+ st.entry = node.entry
+ node.entry = nil
+ st.updateAccess(node.access[0])
+
+ l = entry.Key.bits(bitpos, node.bits)
+ st = node.subtree[l]
+ if st == nil {
+ st = newMemTree(memTreeLW, node, l)
+ }
+ bitpos += node.bits
+ node = st
+
+ }
+ }
+
+ node.entry = entry
+ node.lastDBaccess = s.dbAccessCnt
+ node.updateAccess(s.accessCnt)
+ s.entryCnt++
+
+ return
+}
+
+func (s *MemStore) Get(hash Key) (chunk *Chunk, err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ node := s.memtree
+ bitpos := uint(0)
+ for node.entry == nil {
+ l := hash.bits(bitpos, node.bits)
+ st := node.subtree[l]
+ if st == nil {
+ return nil, notFound
+ }
+ bitpos += node.bits
+ node = st
+ }
+
+ if node.entry.Key.isEqual(hash) {
+ s.accessCnt++
+ node.updateAccess(s.accessCnt)
+ chunk = node.entry
+ if s.dbAccessCnt-node.lastDBaccess > dbForceUpdateAccessCnt {
+ s.dbAccessCnt++
+ node.lastDBaccess = s.dbAccessCnt
+ if s.dbStore != nil {
+ s.dbStore.updateAccessCnt(hash)
+ }
+ }
+ } else {
+ err = notFound
+ }
+
+ return
+}
+
+func (s *MemStore) removeOldest() {
+ node := s.memtree
+
+ for node.entry == nil {
+
+ aidx := uint(0)
+ av := node.access[aidx]
+
+ for aidx < node.width/2-1 {
+ if av == node.access[aidx*2+1] {
+ node.access[aidx] = node.access[aidx*2+2]
+ aidx = aidx*2 + 1
+ } else if av == node.access[aidx*2+2] {
+ node.access[aidx] = node.access[aidx*2+1]
+ aidx = aidx*2 + 2
+ } else {
+ panic(nil)
+ }
+ }
+ pidx := aidx*2 + 2 - node.width
+ if (node.subtree[pidx] != nil) && (av == node.subtree[pidx].access[0]) {
+ if node.subtree[pidx+1] != nil {
+ node.access[aidx] = node.subtree[pidx+1].access[0]
+ } else {
+ node.access[aidx] = 0
+ }
+ } else if (node.subtree[pidx+1] != nil) && (av == node.subtree[pidx+1].access[0]) {
+ if node.subtree[pidx] != nil {
+ node.access[aidx] = node.subtree[pidx].access[0]
+ } else {
+ node.access[aidx] = 0
+ }
+ pidx++
+ } else {
+ panic(nil)
+ }
+
+ //fmt.Println(pidx)
+ node = node.subtree[pidx]
+
+ }
+
+ if node.entry.dbStored != nil {
+ <-node.entry.dbStored
+ }
+
+ if node.entry.SData != nil {
+ node.entry = nil
+ s.entryCnt--
+ }
+
+ node.access[0] = 0
+
+ //---
+
+ aidx := uint(0)
+ for {
+ aa := node.access[aidx]
+ if aidx > 0 {
+ aidx = (aidx - 1) >> 1
+ } else {
+ pidx := node.parentIdx
+ node = node.parent
+ if node == nil {
+ return
+ }
+ aidx = (node.width + pidx - 2) >> 1
+ }
+ if (aa != 0) && ((aa < node.access[aidx]) || (node.access[aidx] == 0)) {
+ node.access[aidx] = aa
+ }
+ }
+}
diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go
new file mode 100644
index 000000000..2e0ab535a
--- /dev/null
+++ b/swarm/storage/memstore_test.go
@@ -0,0 +1,50 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "testing"
+)
+
+func testMemStore(l int64, branches int64, t *testing.T) {
+ m := NewMemStore(nil, defaultCacheCapacity)
+ testStore(m, l, branches, t)
+}
+
+func TestMemStore128_10000(t *testing.T) {
+ testMemStore(10000, 128, t)
+}
+
+func TestMemStore128_1000(t *testing.T) {
+ testMemStore(1000, 128, t)
+}
+
+func TestMemStore128_100(t *testing.T) {
+ testMemStore(100, 128, t)
+}
+
+func TestMemStore2_100(t *testing.T) {
+ testMemStore(100, 2, t)
+}
+
+func TestMemStoreNotFound(t *testing.T) {
+ m := NewMemStore(nil, defaultCacheCapacity)
+ _, err := m.Get(ZeroKey)
+ if err != notFound {
+ t.Errorf("Expected notFound, got %v", err)
+ }
+}
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
new file mode 100644
index 000000000..334229aed
--- /dev/null
+++ b/swarm/storage/netstore.go
@@ -0,0 +1,134 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "path/filepath"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+/*
+NetStore is a cloud storage access abstaction layer for swarm
+it contains the shared logic of network served chunk store/retrieval requests
+both local (coming from DPA api) and remote (coming from peers via bzz protocol)
+it implements the ChunkStore interface and embeds LocalStore
+
+It is called by the bzz protocol instances via Depo (the store/retrieve request handler)
+a protocol instance is running on each peer, so this is heavily parallelised.
+NetStore falls back to a backend (CloudStorage interface)
+implemented by bzz/network/forwarder. forwarder or IPFS or IPΞS
+*/
+type NetStore struct {
+ hashfunc Hasher
+ localStore *LocalStore
+ cloud CloudStore
+ lock sync.Mutex
+}
+
+// backend engine for cloud store
+// It can be aggregate dispatching to several parallel implementations:
+// bzz/network/forwarder. forwarder or IPFS or IPΞS
+type CloudStore interface {
+ Store(*Chunk)
+ Deliver(*Chunk)
+ Retrieve(*Chunk)
+}
+
+type StoreParams struct {
+ ChunkDbPath string
+ DbCapacity uint64
+ CacheCapacity uint
+ Radius int
+}
+
+func NewStoreParams(path string) (self *StoreParams) {
+ return &StoreParams{
+ ChunkDbPath: filepath.Join(path, "chunks"),
+ DbCapacity: defaultDbCapacity,
+ CacheCapacity: defaultCacheCapacity,
+ Radius: defaultRadius,
+ }
+}
+
+// netstore contructor, takes path argument that is used to initialise dbStore,
+// the persistent (disk) storage component of LocalStore
+// the second argument is the hive, the connection/logistics manager for the node
+func NewNetStore(hash Hasher, lstore *LocalStore, cloud CloudStore, params *StoreParams) *NetStore {
+ return &NetStore{
+ hashfunc: hash,
+ localStore: lstore,
+ cloud: cloud,
+ }
+}
+
+const (
+ // maximum number of peers that a retrieved message is delivered to
+ requesterCount = 3
+)
+
+var (
+ // timeout interval before retrieval is timed out
+ searchTimeout = 3 * time.Second
+)
+
+// store logic common to local and network chunk store requests
+// ~ unsafe put in localdb no check if exists no extra copy no hash validation
+// the chunk is forced to propagate (Cloud.Store) even if locally found!
+// caller needs to make sure if that is wanted
+func (self *NetStore) Put(entry *Chunk) {
+ self.localStore.Put(entry)
+
+ // handle deliveries
+ if entry.Req != nil {
+ glog.V(logger.Detail).Infof("NetStore.Put: localStore.Put %v hit existing request...delivering", entry.Key.Log())
+ // closing C singals to other routines (local requests)
+ // that the chunk is has been retrieved
+ close(entry.Req.C)
+ // deliver the chunk to requesters upstream
+ go self.cloud.Deliver(entry)
+ } else {
+ glog.V(logger.Detail).Infof("NetStore.Put: localStore.Put %v stored locally", entry.Key.Log())
+ // handle propagating store requests
+ // go self.cloud.Store(entry)
+ go self.cloud.Store(entry)
+ }
+}
+
+// retrieve logic common for local and network chunk retrieval requests
+func (self *NetStore) Get(key Key) (*Chunk, error) {
+ var err error
+ chunk, err := self.localStore.Get(key)
+ if err == nil {
+ if chunk.Req == nil {
+ glog.V(logger.Detail).Infof("NetStore.Get: %v found locally", key)
+ } else {
+ glog.V(logger.Detail).Infof("NetStore.Get: %v hit on an existing request", key)
+ // no need to launch again
+ }
+ return chunk, err
+ }
+ // no data and no request status
+ glog.V(logger.Detail).Infof("NetStore.Get: %v not found locally. open new request", key)
+ chunk = NewChunk(key, newRequestStatus(key))
+ self.localStore.memStore.Put(chunk)
+ go self.cloud.Retrieve(chunk)
+ return chunk, nil
+}
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
new file mode 100644
index 000000000..3c1ef17a0
--- /dev/null
+++ b/swarm/storage/pyramid.go
@@ -0,0 +1,211 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "math"
+ "strings"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+)
+
+const (
+ processors = 8
+)
+
+type Tree struct {
+ Chunks int64
+ Levels []map[int64]*Node
+ Lock sync.RWMutex
+}
+
+type Node struct {
+ Pending int64
+ Size uint64
+ Children []common.Hash
+ Last bool
+}
+
+func (self *Node) String() string {
+ var children []string
+ for _, node := range self.Children {
+ children = append(children, node.Hex())
+ }
+ return fmt.Sprintf("pending: %v, size: %v, last :%v, children: %v", self.Pending, self.Size, self.Last, strings.Join(children, ", "))
+}
+
+type Task struct {
+ Index int64 // Index of the chunk being processed
+ Size uint64
+ Data []byte // Binary blob of the chunk
+ Last bool
+}
+
+type PyramidChunker struct {
+ hashFunc Hasher
+ chunkSize int64
+ hashSize int64
+ branches int64
+ workerCount int
+}
+
+func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
+ self = &PyramidChunker{}
+ self.hashFunc = MakeHashFunc(params.Hash)
+ self.branches = params.Branches
+ self.hashSize = int64(self.hashFunc().Size())
+ self.chunkSize = self.hashSize * self.branches
+ self.workerCount = 1
+ return
+}
+
+func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+
+ chunks := (size + self.chunkSize - 1) / self.chunkSize
+ depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1
+ // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth)
+
+ results := Tree{
+ Chunks: chunks,
+ Levels: make([]map[int64]*Node, depth),
+ }
+ for i := 0; i < depth; i++ {
+ results.Levels[i] = make(map[int64]*Node)
+ }
+ // Create a pool of workers to crunch through the file
+ tasks := make(chan *Task, 2*processors)
+ pend := new(sync.WaitGroup)
+ abortC := make(chan bool)
+ for i := 0; i < processors; i++ {
+ pend.Add(1)
+ go self.processor(pend, swg, tasks, chunkC, &results)
+ }
+ // Feed the chunks into the task pool
+ for index := 0; ; index++ {
+ buffer := make([]byte, self.chunkSize+8)
+ n, err := data.Read(buffer[8:])
+ last := err == io.ErrUnexpectedEOF || err == io.EOF
+ // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth)
+ if err != nil && !last {
+ // glog.V(logger.Info).Infof("error: %v", err)
+ close(abortC)
+ break
+ }
+ binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
+ pend.Add(1)
+ // glog.V(logger.Info).Infof("-> task %v (%v)", index, n)
+ select {
+ case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
+ case <-abortC:
+ return nil, err
+ }
+ if last {
+ // glog.V(logger.Info).Infof("last task %v (%v)", index, n)
+ break
+ }
+ }
+ // Wait for the workers and return
+ close(tasks)
+ pend.Wait()
+
+ // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0])
+ key := results.Levels[0][0].Children[0][:]
+ return key, nil
+}
+
+func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
+ defer pend.Done()
+
+ // glog.V(logger.Info).Infof("processor started")
+ // Start processing leaf chunks ad infinitum
+ hasher := self.hashFunc()
+ for task := range tasks {
+ depth, pow := len(results.Levels)-1, self.branches
+ // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last)
+ size := task.Size
+ data := task.Data
+ var node *Node
+ for depth >= 0 {
+ // New chunk received, reset the hasher and start processing
+ hasher.Reset()
+ if node == nil { // Leaf node, hash the data chunk
+ hasher.Write(task.Data)
+ } else { // Internal node, hash the children
+ size = node.Size
+ data = make([]byte, hasher.Size()*len(node.Children)+8)
+ binary.LittleEndian.PutUint64(data[:8], size)
+
+ hasher.Write(data[:8])
+ for i, hash := range node.Children {
+ copy(data[i*hasher.Size()+8:], hash[:])
+ hasher.Write(hash[:])
+ }
+ }
+ hash := hasher.Sum(nil)
+ last := task.Last || (node != nil) && node.Last
+ // Insert the subresult into the memoization tree
+ results.Lock.Lock()
+ if node = results.Levels[depth][task.Index/pow]; node == nil {
+ // Figure out the pending tasks
+ pending := self.branches
+ if task.Index/pow == results.Chunks/pow {
+ pending = (results.Chunks + pow/self.branches - 1) / (pow / self.branches) % self.branches
+ }
+ node = &Node{pending, 0, make([]common.Hash, pending), last}
+ results.Levels[depth][task.Index/pow] = node
+ // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending)
+ }
+ node.Pending--
+ // glog.V(logger.Info).Infof("pending now: %v", node.Pending)
+ i := task.Index / (pow / self.branches) % self.branches
+ if last {
+ node.Last = true
+ }
+ copy(node.Children[i][:], hash)
+ node.Size += size
+ left := node.Pending
+ // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size)
+ if chunkC != nil {
+ if swg != nil {
+ swg.Add(1)
+ }
+ select {
+ case chunkC <- &Chunk{Key: hash, SData: data, wg: swg}:
+ // case <- self.quitC
+ }
+ }
+ if depth+1 < len(results.Levels) {
+ delete(results.Levels[depth+1], task.Index/(pow/self.branches))
+ }
+
+ results.Lock.Unlock()
+ // If there's more work to be done, leave for others
+ // glog.V(logger.Info).Infof("left %v", left)
+ if left > 0 {
+ break
+ }
+ // We're the last ones in this batch, merge the children together
+ depth--
+ pow *= self.branches
+ }
+ pend.Done()
+ }
+}
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
new file mode 100644
index 000000000..0dcbc0100
--- /dev/null
+++ b/swarm/storage/types.go
@@ -0,0 +1,232 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "bytes"
+ "crypto"
+ "fmt"
+ "hash"
+ "io"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto/sha3"
+)
+
+type Hasher func() hash.Hash
+
+// Peer is the recorded as Source on the chunk
+// should probably not be here? but network should wrap chunk object
+type Peer interface{}
+
+type Key []byte
+
+func (x Key) Size() uint {
+ return uint(len(x))
+}
+
+func (x Key) isEqual(y Key) bool {
+ return bytes.Compare(x, y) == 0
+}
+
+func (h Key) bits(i, j uint) uint {
+ ii := i >> 3
+ jj := i & 7
+ if ii >= h.Size() {
+ return 0
+ }
+
+ if jj+j <= 8 {
+ return uint((h[ii] >> jj) & ((1 << j) - 1))
+ }
+
+ res := uint(h[ii] >> jj)
+ jj = 8 - jj
+ j -= jj
+ for j != 0 {
+ ii++
+ if j < 8 {
+ res += uint(h[ii]&((1<<j)-1)) << jj
+ return res
+ }
+ res += uint(h[ii]) << jj
+ jj += 8
+ j -= 8
+ }
+ return res
+}
+
+func IsZeroKey(key Key) bool {
+ return len(key) == 0 || bytes.Equal(key, ZeroKey)
+}
+
+var ZeroKey = Key(common.Hash{}.Bytes())
+
+func MakeHashFunc(hash string) Hasher {
+ switch hash {
+ case "SHA256":
+ return crypto.SHA256.New
+ case "SHA3":
+ return sha3.NewKeccak256
+ }
+ return nil
+}
+
+func (key Key) Hex() string {
+ return fmt.Sprintf("%064x", []byte(key[:]))
+}
+
+func (key Key) Log() string {
+ if len(key[:]) < 4 {
+ return fmt.Sprintf("%x", []byte(key[:]))
+ }
+ return fmt.Sprintf("%08x", []byte(key[:4]))
+}
+
+func (key Key) String() string {
+ return fmt.Sprintf("%064x", []byte(key)[:])
+}
+
+func (key Key) MarshalJSON() (out []byte, err error) {
+ return []byte(`"` + key.String() + `"`), nil
+}
+
+func (key *Key) UnmarshalJSON(value []byte) error {
+ s := string(value)
+ *key = make([]byte, 32)
+ h := common.Hex2Bytes(s[1 : len(s)-1])
+ copy(*key, h)
+ return nil
+}
+
+// each chunk when first requested opens a record associated with the request
+// next time a request for the same chunk arrives, this record is updated
+// this request status keeps track of the request ID-s as well as the requesting
+// peers and has a channel that is closed when the chunk is retrieved. Multiple
+// local callers can wait on this channel (or combined with a timeout, block with a
+// select).
+type RequestStatus struct {
+ Key Key
+ Source Peer
+ C chan bool
+ Requesters map[uint64][]interface{}
+}
+
+func newRequestStatus(key Key) *RequestStatus {
+ return &RequestStatus{
+ Key: key,
+ Requesters: make(map[uint64][]interface{}),
+ C: make(chan bool),
+ }
+}
+
+// Chunk also serves as a request object passed to ChunkStores
+// in case it is a retrieval request, Data is nil and Size is 0
+// Note that Size is not the size of the data chunk, which is Data.Size()
+// but the size of the subtree encoded in the chunk
+// 0 if request, to be supplied by the dpa
+type Chunk struct {
+ Key Key // always
+ SData []byte // nil if request, to be supplied by dpa
+ Size int64 // size of the data covered by the subtree encoded in this chunk
+ Source Peer // peer
+ C chan bool // to signal data delivery by the dpa
+ Req *RequestStatus // request Status needed by netStore
+ wg *sync.WaitGroup // wg to synchronize
+ dbStored chan bool // never remove a chunk from memStore before it is written to dbStore
+}
+
+func NewChunk(key Key, rs *RequestStatus) *Chunk {
+ return &Chunk{Key: key, Req: rs}
+}
+
+/*
+The ChunkStore interface is implemented by :
+
+- MemStore: a memory cache
+- DbStore: local disk/db store
+- LocalStore: a combination (sequence of) memStore and dbStore
+- NetStore: cloud storage abstraction layer
+- DPA: local requests for swarm storage and retrieval
+*/
+type ChunkStore interface {
+ Put(*Chunk) // effectively there is no error even if there is an error
+ Get(Key) (*Chunk, error)
+}
+
+/*
+Chunker is the interface to a component that is responsible for disassembling and assembling larger data and indended to be the dependency of a DPA storage system with fixed maximum chunksize.
+
+It relies on the underlying chunking model.
+
+When calling Split, the caller provides a channel (chan *Chunk) on which it receives chunks to store. The DPA delegates to storage layers (implementing ChunkStore interface).
+
+Split returns an error channel, which the caller can monitor.
+After getting notified that all the data has been split (the error channel is closed), the caller can safely read or save the root key. Optionally it times out if not all chunks get stored or not the entire stream of data has been processed. By inspecting the errc channel the caller can check if any explicit errors (typically IO read/write failures) occured during splitting.
+
+When calling Join with a root key, the caller gets returned a seekable lazy reader. The caller again provides a channel on which the caller receives placeholder chunks with missing data. The DPA is supposed to forward this to the chunk stores and notify the chunker if the data has been delivered (i.e. retrieved from memory cache, disk-persisted db or cloud based swarm delivery). As the seekable reader is used, the chunker then puts these together the relevant parts on demand.
+*/
+type Splitter interface {
+ /*
+ When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
+ New chunks to store are coming to caller via the chunk storage channel, which the caller provides.
+ wg is a Waitgroup (can be nil) that can be used to block until the local storage finishes
+ The caller gets returned an error channel, if an error is encountered during splitting, it is fed to errC error channel.
+ A closed error signals process completion at which point the key can be considered final if there were no errors.
+ */
+ Split(io.Reader, int64, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
+}
+
+type Joiner interface {
+ /*
+ Join reconstructs original content based on a root key.
+ When joining, the caller gets returned a Lazy SectionReader, which is
+ seekable and implements on-demand fetching of chunks as and where it is read.
+ New chunks to retrieve are coming to caller via the Chunk channel, which the caller provides.
+ If an error is encountered during joining, it appears as a reader error.
+ The SectionReader.
+ As a result, partial reads from a document are possible even if other parts
+ are corrupt or lost.
+ The chunks are not meant to be validated by the chunker when joining. This
+ is because it is left to the DPA to decide which sources are trusted.
+ */
+ Join(key Key, chunkC chan *Chunk) LazySectionReader
+}
+
+type Chunker interface {
+ Joiner
+ Splitter
+ // returns the key length
+ // KeySize() int64
+}
+
+// Size, Seek, Read, ReadAt
+type LazySectionReader interface {
+ Size(chan bool) (int64, error)
+ io.Seeker
+ io.Reader
+ io.ReaderAt
+}
+
+type LazyTestSectionReader struct {
+ *io.SectionReader
+}
+
+func (self *LazyTestSectionReader) Size(chan bool) (int64, error) {
+ return self.SectionReader.Size(), nil
+}