diff options
Diffstat (limited to 'swarm/storage')
-rw-r--r-- | swarm/storage/chunker.go | 53 | ||||
-rw-r--r-- | swarm/storage/chunker_test.go | 6 | ||||
-rw-r--r-- | swarm/storage/chunkstore.go | 13 | ||||
-rw-r--r-- | swarm/storage/common.go | 3 | ||||
-rw-r--r-- | swarm/storage/common_test.go | 5 | ||||
-rw-r--r-- | swarm/storage/dbapi.go | 14 | ||||
-rw-r--r-- | swarm/storage/hasherstore.go | 12 | ||||
-rw-r--r-- | swarm/storage/hasherstore_test.go | 10 | ||||
-rw-r--r-- | swarm/storage/ldbstore.go | 7 | ||||
-rw-r--r-- | swarm/storage/ldbstore_test.go | 25 | ||||
-rw-r--r-- | swarm/storage/localstore.go | 27 | ||||
-rw-r--r-- | swarm/storage/memstore.go | 5 | ||||
-rw-r--r-- | swarm/storage/memstore_test.go | 11 | ||||
-rw-r--r-- | swarm/storage/mru/resource.go | 18 | ||||
-rw-r--r-- | swarm/storage/mru/resource_test.go | 6 | ||||
-rw-r--r-- | swarm/storage/netstore.go | 41 | ||||
-rw-r--r-- | swarm/storage/netstore_test.go | 7 | ||||
-rw-r--r-- | swarm/storage/pyramid.go | 8 | ||||
-rw-r--r-- | swarm/storage/types.go | 13 |
19 files changed, 179 insertions, 105 deletions
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 2d197fefa..b9b502273 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -26,6 +26,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" ) /* @@ -93,9 +96,12 @@ type JoinerParams struct { getter Getter // TODO: there is a bug, so depth can only be 0 today, see: https://github.com/ethersphere/go-ethereum/issues/344 depth int + ctx context.Context } type TreeChunker struct { + ctx context.Context + branches int64 hashFunc SwarmHasher dataSize int64 @@ -136,6 +142,7 @@ func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *Lazy addr: addr, getter: getter, depth: depth, + ctx: ctx, } return NewTreeJoiner(jp).Join(ctx) @@ -174,6 +181,8 @@ func NewTreeJoiner(params *JoinerParams) *TreeChunker { tc.errC = make(chan error) tc.quitC = make(chan bool) + tc.ctx = params.ctx + return tc } @@ -351,7 +360,7 @@ func (tc *TreeChunker) runWorker() { return } - h, err := tc.putter.Put(job.chunk) + h, err := tc.putter.Put(tc.ctx, job.chunk) if err != nil { tc.errC <- err return @@ -371,6 +380,7 @@ func (tc *TreeChunker) Append() (Address, func(), error) { // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { + Ctx context.Context key Address // root key chunkData ChunkData off int64 // offset @@ -389,16 +399,28 @@ func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader { hashSize: tc.hashSize, depth: tc.depth, getter: tc.getter, + Ctx: tc.ctx, } } +func (r *LazyChunkReader) Context() context.Context { + return r.Ctx +} + // Size is meant to be called on the LazySectionReader -func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { +func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, err error) { metrics.GetOrRegisterCounter("lazychunkreader.size", nil).Inc(1) + var sp opentracing.Span + var cctx context.Context + cctx, sp = spancontext.StartSpan( + ctx, + "lcr.size") + defer sp.Finish() + log.Debug("lazychunkreader.size", "key", r.key) if r.chunkData == nil { - chunkData, err := r.getter.Get(Reference(r.key)) + chunkData, err := r.getter.Get(cctx, Reference(r.key)) if err != nil { return 0, err } @@ -421,12 +443,25 @@ func (r *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { metrics.GetOrRegisterCounter("lazychunkreader.readat", nil).Inc(1) + var sp opentracing.Span + var cctx context.Context + cctx, sp = spancontext.StartSpan( + r.Ctx, + "lcr.read") + defer sp.Finish() + + defer func() { + sp.LogFields( + olog.Int("off", int(off)), + olog.Int("read", read)) + }() + // this is correct, a swarm doc cannot be zero length, so no EOF is expected if len(b) == 0 { return 0, nil } quitC := make(chan bool) - size, err := r.Size(quitC) + size, err := r.Size(cctx, quitC) if err != nil { log.Error("lazychunkreader.readat.size", "size", size, "err", err) return 0, err @@ -449,7 +484,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -467,7 +502,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return len(b), nil } -func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level for chunkData.Size() < treeSize && depth > r.depth { @@ -514,7 +549,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS wg.Add(1) go func(j int64) { childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] - chunkData, err := r.getter.Get(Reference(childKey)) + chunkData, err := r.getter.Get(ctx, Reference(childKey)) if err != nil { log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) select { @@ -533,7 +568,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS if soff < off { soff = off } - r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } @@ -570,7 +605,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { offset += r.off case 2: if r.chunkData == nil { //seek from the end requires rootchunk for size. call Size first - _, err := r.Size(nil) + _, err := r.Size(context.TODO(), nil) if err != nil { return 0, fmt.Errorf("can't get size: %v", err) } diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index 69c388b39..dbcc8700d 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -50,11 +50,11 @@ type fakeChunkStore struct { } // Put doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Put(*Chunk) { +func (f *fakeChunkStore) Put(context.Context, *Chunk) { } // Gut doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Get(Address) (*Chunk, error) { +func (f *fakeChunkStore) Get(context.Context, Address) (*Chunk, error) { return nil, errors.New("FakeChunkStore doesn't support Get") } @@ -281,7 +281,7 @@ func TestRandomBrokenData(t *testing.T) { } func benchReadAll(reader LazySectionReader) { - size, _ := reader.Size(nil) + size, _ := reader.Size(context.TODO(), nil) output := make([]byte, 1000) for pos := int64(0); pos < size; pos += 1000 { reader.ReadAt(output, pos) diff --git a/swarm/storage/chunkstore.go b/swarm/storage/chunkstore.go index ce95cd971..3b4d97a7a 100644 --- a/swarm/storage/chunkstore.go +++ b/swarm/storage/chunkstore.go @@ -16,7 +16,10 @@ package storage -import "sync" +import ( + "context" + "sync" +) /* ChunkStore interface is implemented by : @@ -28,8 +31,8 @@ ChunkStore interface is implemented by : - FakeChunkStore: dummy store which doesn't store anything just implements the interface */ type ChunkStore interface { - Put(*Chunk) // effectively there is no error even if there is an error - Get(Address) (*Chunk, error) + Put(context.Context, *Chunk) // effectively there is no error even if there is an error + Get(context.Context, Address) (*Chunk, error) Close() } @@ -45,14 +48,14 @@ func NewMapChunkStore() *MapChunkStore { } } -func (m *MapChunkStore) Put(chunk *Chunk) { +func (m *MapChunkStore) Put(ctx context.Context, chunk *Chunk) { m.mu.Lock() defer m.mu.Unlock() m.chunks[chunk.Addr.Hex()] = chunk chunk.markAsStored() } -func (m *MapChunkStore) Get(addr Address) (*Chunk, error) { +func (m *MapChunkStore) Get(ctx context.Context, addr Address) (*Chunk, error) { m.mu.RLock() defer m.mu.RUnlock() chunk := m.chunks[addr.Hex()] diff --git a/swarm/storage/common.go b/swarm/storage/common.go index d86cb6914..d6352820e 100644 --- a/swarm/storage/common.go +++ b/swarm/storage/common.go @@ -16,6 +16,7 @@ package storage import ( + "context" "sync" "github.com/ethereum/go-ethereum/swarm/log" @@ -37,7 +38,7 @@ func PutChunks(store *LocalStore, chunks ...*Chunk) { } }() for _, c := range chunks { - go store.Put(c) + go store.Put(context.TODO(), c) } wg.Wait() } diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index c6e97d68f..dc1a3ab35 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "context" "crypto/rand" "flag" "fmt" @@ -69,7 +70,7 @@ func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs [ for chunk := range c { wg.Add(1) chunk := chunk - store.Put(chunk) + store.Put(context.TODO(), chunk) go func() { defer wg.Done() <-chunk.dbStoredC @@ -103,7 +104,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) for _, k := range hs { go func(h Address) { defer wg.Done() - chunk, err := store.Get(h) + chunk, err := store.Get(context.TODO(), h) if err != nil { errc <- err return diff --git a/swarm/storage/dbapi.go b/swarm/storage/dbapi.go index 24234b031..dd71752eb 100644 --- a/swarm/storage/dbapi.go +++ b/swarm/storage/dbapi.go @@ -16,6 +16,8 @@ package storage +import "context" + // wrapper of db-s to provide mockable custom local chunk store access to syncer type DBAPI struct { db *LDBStore @@ -27,8 +29,8 @@ func NewDBAPI(loc *LocalStore) *DBAPI { } // to obtain the chunks from address or request db entry only -func (d *DBAPI) Get(addr Address) (*Chunk, error) { - return d.loc.Get(addr) +func (d *DBAPI) Get(ctx context.Context, addr Address) (*Chunk, error) { + return d.loc.Get(ctx, addr) } // current storage counter of chunk db @@ -42,11 +44,11 @@ func (d *DBAPI) Iterator(from uint64, to uint64, po uint8, f func(Address, uint6 } // to obtain the chunks from address or request db entry only -func (d *DBAPI) GetOrCreateRequest(addr Address) (*Chunk, bool) { - return d.loc.GetOrCreateRequest(addr) +func (d *DBAPI) GetOrCreateRequest(ctx context.Context, addr Address) (*Chunk, bool) { + return d.loc.GetOrCreateRequest(ctx, addr) } // to obtain the chunks from key or request db entry only -func (d *DBAPI) Put(chunk *Chunk) { - d.loc.Put(chunk) +func (d *DBAPI) Put(ctx context.Context, chunk *Chunk) { + d.loc.Put(ctx, chunk) } diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index e18b66ddc..139c0ee03 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -74,7 +74,7 @@ func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) // Put stores the chunkData into the ChunkStore of the hasherStore and returns the reference. // If hasherStore has a chunkEncryption object, the data will be encrypted. // Asynchronous function, the data will not necessarily be stored when it returns. -func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { +func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, error) { c := chunkData size := chunkData.Size() var encryptionKey encryption.Key @@ -87,7 +87,7 @@ func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { } chunk := h.createChunk(c, size) - h.storeChunk(chunk) + h.storeChunk(ctx, chunk) return Reference(append(chunk.Addr, encryptionKey...)), nil } @@ -95,14 +95,14 @@ func (h *hasherStore) Put(chunkData ChunkData) (Reference, error) { // Get returns data of the chunk with the given reference (retrieved from the ChunkStore of hasherStore). // If the data is encrypted and the reference contains an encryption key, it will be decrypted before // return. -func (h *hasherStore) Get(ref Reference) (ChunkData, error) { +func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) { key, encryptionKey, err := parseReference(ref, h.hashSize) if err != nil { return nil, err } toDecrypt := (encryptionKey != nil) - chunk, err := h.store.Get(key) + chunk, err := h.store.Get(ctx, key) if err != nil { return nil, err } @@ -207,13 +207,13 @@ func (h *hasherStore) RefSize() int64 { return h.refSize } -func (h *hasherStore) storeChunk(chunk *Chunk) { +func (h *hasherStore) storeChunk(ctx context.Context, chunk *Chunk) { h.wg.Add(1) go func() { <-chunk.dbStoredC h.wg.Done() }() - h.store.Put(chunk) + h.store.Put(ctx, chunk) } func parseReference(ref Reference, hashSize int) (Address, encryption.Key, error) { diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go index cf7b0dcc3..ddf1c39b0 100644 --- a/swarm/storage/hasherstore_test.go +++ b/swarm/storage/hasherstore_test.go @@ -47,13 +47,13 @@ func TestHasherStore(t *testing.T) { // Put two random chunks into the hasherStore chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key1, err := hasherStore.Put(chunkData1) + key1, err := hasherStore.Put(context.TODO(), chunkData1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key2, err := hasherStore.Put(chunkData2) + key2, err := hasherStore.Put(context.TODO(), chunkData2) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } @@ -67,7 +67,7 @@ func TestHasherStore(t *testing.T) { } // Get the first chunk - retrievedChunkData1, err := hasherStore.Get(key1) + retrievedChunkData1, err := hasherStore.Get(context.TODO(), key1) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -78,7 +78,7 @@ func TestHasherStore(t *testing.T) { } // Get the second chunk - retrievedChunkData2, err := hasherStore.Get(key2) + retrievedChunkData2, err := hasherStore.Get(context.TODO(), key2) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -105,7 +105,7 @@ func TestHasherStore(t *testing.T) { } // Check if chunk data in store is encrypted or not - chunkInStore, err := chunkStore.Get(hash1) + chunkInStore, err := chunkStore.Get(context.TODO(), hash1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 178b1ebc4..7920ee767 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -25,6 +25,7 @@ package storage import ( "archive/tar" "bytes" + "context" "encoding/binary" "encoding/hex" "fmt" @@ -370,7 +371,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { key := Address(keybytes) chunk := NewChunk(key, nil) chunk.SData = data[32:] - s.Put(chunk) + s.Put(context.TODO(), chunk) wg.Add(1) go func() { defer wg.Done() @@ -499,7 +500,7 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } -func (s *LDBStore) Put(chunk *Chunk) { +func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) log.Trace("ldbstore.put", "key", chunk.Addr) @@ -639,7 +640,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { return true } -func (s *LDBStore) Get(addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 2453d2f30..baf9e8c14 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "context" "fmt" "io/ioutil" "os" @@ -157,7 +158,7 @@ func testDbStoreNotFound(t *testing.T, mock bool) { t.Fatalf("init dbStore failed: %v", err) } - _, err = db.Get(ZeroAddr) + _, err = db.Get(context.TODO(), ZeroAddr) if err != ErrChunkNotFound { t.Errorf("Expected ErrChunkNotFound, got %v", err) } @@ -188,7 +189,7 @@ func testIterator(t *testing.T, mock bool) { wg := &sync.WaitGroup{} wg.Add(len(chunks)) for i = 0; i < len(chunks); i++ { - db.Put(chunks[i]) + db.Put(context.TODO(), chunks[i]) chunkkeys[i] = chunks[i].Addr j := i go func() { @@ -299,7 +300,7 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { } for i := 0; i < n; i++ { - go ldb.Put(chunks[i]) + go ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored @@ -310,7 +311,7 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) for i := 0; i < n; i++ { - ret, err := ldb.Get(chunks[i].Addr) + ret, err := ldb.Get(context.TODO(), chunks[i].Addr) if err != nil { t.Fatal(err) } @@ -349,7 +350,7 @@ func TestLDBStoreCollectGarbage(t *testing.T) { } for i := 0; i < n; i++ { - ldb.Put(chunks[i]) + ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored @@ -364,7 +365,7 @@ func TestLDBStoreCollectGarbage(t *testing.T) { var missing int for i := 0; i < n; i++ { - ret, err := ldb.Get(chunks[i].Addr) + ret, err := ldb.Get(context.TODO(), chunks[i].Addr) if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { missing++ continue @@ -403,7 +404,7 @@ func TestLDBStoreAddRemove(t *testing.T) { } for i := 0; i < n; i++ { - go ldb.Put(chunks[i]) + go ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored before continuing @@ -428,7 +429,7 @@ func TestLDBStoreAddRemove(t *testing.T) { log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) for i := 0; i < n; i++ { - ret, err := ldb.Get(chunks[i].Addr) + ret, err := ldb.Get(context.TODO(), chunks[i].Addr) if i%2 == 0 { // expect even chunks to be missing @@ -465,7 +466,7 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { } for i := 0; i < n; i++ { - ldb.Put(chunks[i]) + ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored before continuing @@ -494,7 +495,7 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { n = 10 for i := 0; i < n; i++ { - ldb.Put(chunks[i]) + ldb.Put(context.TODO(), chunks[i]) } // wait for all chunks to be stored before continuing @@ -504,14 +505,14 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { // expect for first chunk to be missing, because it has the smallest access value idx := 0 - ret, err := ldb.Get(chunks[idx].Addr) + ret, err := ldb.Get(context.TODO(), chunks[idx].Addr) if err == nil || ret != nil { t.Fatal("expected first chunk to be missing, but got no error") } // expect for last chunk to be present, as it has the largest access value idx = 9 - ret, err = ldb.Get(chunks[idx].Addr) + ret, err = ldb.Get(context.TODO(), chunks[idx].Addr) if err != nil { t.Fatalf("expected no error, but got %s", err) } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 4c57086fa..096d150ae 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -17,6 +17,7 @@ package storage import ( + "context" "encoding/binary" "fmt" "path/filepath" @@ -96,7 +97,7 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { // when the chunk is stored in memstore. // After the LDBStore.Put, it is ensured that the MemStore // contains the chunk with the same data, but nil ReqC channel. -func (ls *LocalStore) Put(chunk *Chunk) { +func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) { if l := len(chunk.SData); l < 9 { log.Debug("incomplete chunk data", "addr", chunk.Addr, "length", l) chunk.SetErrored(ErrChunkInvalid) @@ -123,7 +124,7 @@ func (ls *LocalStore) Put(chunk *Chunk) { chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - memChunk, err := ls.memStore.Get(chunk.Addr) + memChunk, err := ls.memStore.Get(ctx, chunk.Addr) switch err { case nil: if memChunk.ReqC == nil { @@ -136,7 +137,7 @@ func (ls *LocalStore) Put(chunk *Chunk) { return } - ls.DbStore.Put(chunk) + ls.DbStore.Put(ctx, chunk) // chunk is no longer a request, but a chunk with data, so replace it in memStore newc := NewChunk(chunk.Addr, nil) @@ -144,7 +145,7 @@ func (ls *LocalStore) Put(chunk *Chunk) { newc.Size = chunk.Size newc.dbStoredC = chunk.dbStoredC - ls.memStore.Put(newc) + ls.memStore.Put(ctx, newc) if memChunk != nil && memChunk.ReqC != nil { close(memChunk.ReqC) @@ -155,15 +156,15 @@ func (ls *LocalStore) Put(chunk *Chunk) { // This method is blocking until the chunk is retrieved // so additional timeout may be needed to wrap this call if // ChunkStores are remote and can have long latency -func (ls *LocalStore) Get(addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { ls.mu.Lock() defer ls.mu.Unlock() - return ls.get(addr) + return ls.get(ctx, addr) } -func (ls *LocalStore) get(addr Address) (chunk *Chunk, err error) { - chunk, err = ls.memStore.Get(addr) +func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) { + chunk, err = ls.memStore.Get(ctx, addr) if err == nil { if chunk.ReqC != nil { select { @@ -177,25 +178,25 @@ func (ls *LocalStore) get(addr Address) (chunk *Chunk, err error) { return } metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) - chunk, err = ls.DbStore.Get(addr) + chunk, err = ls.DbStore.Get(ctx, addr) if err != nil { metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) return } chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - ls.memStore.Put(chunk) + ls.memStore.Put(ctx, chunk) return } // retrieve logic common for local and network chunk retrieval requests -func (ls *LocalStore) GetOrCreateRequest(addr Address) (chunk *Chunk, created bool) { +func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) { metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1) ls.mu.Lock() defer ls.mu.Unlock() var err error - chunk, err = ls.get(addr) + chunk, err = ls.get(ctx, addr) if err == nil && chunk.GetErrored() == nil { metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1) log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr)) @@ -210,7 +211,7 @@ func (ls *LocalStore) GetOrCreateRequest(addr Address) (chunk *Chunk, created bo metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1) log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr)) chunk = NewChunk(addr, make(chan bool)) - ls.memStore.Put(chunk) + ls.memStore.Put(ctx, chunk) return chunk, true } diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 7af31ffbd..55cfcbfea 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -19,6 +19,7 @@ package storage import ( + "context" "sync" lru "github.com/hashicorp/golang-lru" @@ -68,7 +69,7 @@ func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { } } -func (m *MemStore) Get(addr Address) (*Chunk, error) { +func (m *MemStore) Get(ctx context.Context, addr Address) (*Chunk, error) { if m.disabled { return nil, ErrChunkNotFound } @@ -90,7 +91,7 @@ func (m *MemStore) Get(addr Address) (*Chunk, error) { return c.(*Chunk), nil } -func (m *MemStore) Put(c *Chunk) { +func (m *MemStore) Put(ctx context.Context, c *Chunk) { if m.disabled { return } diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go index 5c68a4b4b..2c1b0e89e 100644 --- a/swarm/storage/memstore_test.go +++ b/swarm/storage/memstore_test.go @@ -17,6 +17,7 @@ package storage import ( + "context" "crypto/rand" "encoding/binary" "io/ioutil" @@ -72,7 +73,7 @@ func TestMemStoreNotFound(t *testing.T) { m := newTestMemStore() defer m.Close() - _, err := m.Get(ZeroAddr) + _, err := m.Get(context.TODO(), ZeroAddr) if err != ErrChunkNotFound { t.Errorf("Expected ErrChunkNotFound, got %v", err) } @@ -187,8 +188,8 @@ func TestMemStoreAndLDBStore(t *testing.T) { } for i := 0; i < tt.n; i++ { - go ldb.Put(chunks[i]) - memStore.Put(chunks[i]) + go ldb.Put(context.TODO(), chunks[i]) + memStore.Put(context.TODO(), chunks[i]) if got := memStore.cache.Len(); got > cacheCap { t.Fatalf("expected to get cache capacity less than %v, but got %v", cacheCap, got) @@ -200,10 +201,10 @@ func TestMemStoreAndLDBStore(t *testing.T) { } for i := 0; i < tt.n; i++ { - _, err := memStore.Get(chunks[i].Addr) + _, err := memStore.Get(context.TODO(), chunks[i].Addr) if err != nil { if err == ErrChunkNotFound { - _, err := ldb.Get(chunks[i].Addr) + _, err := ldb.Get(context.TODO(), chunks[i].Addr) if err != nil { t.Fatalf("couldn't get chunk %v from ldb, got error: %v", i, err) } diff --git a/swarm/storage/mru/resource.go b/swarm/storage/mru/resource.go index 1e92a5e92..4f5a4f44c 100644 --- a/swarm/storage/mru/resource.go +++ b/swarm/storage/mru/resource.go @@ -125,6 +125,10 @@ type resource struct { updated time.Time } +func (r *resource) Context() context.Context { + return context.TODO() +} + // TODO Expire content after a defined period (to force resync) func (r *resource) isSynced() bool { return !r.updated.IsZero() @@ -134,7 +138,7 @@ func (r *resource) NameHash() common.Hash { return r.nameHash } -func (r *resource) Size(chan bool) (int64, error) { +func (r *resource) Size(context.Context, chan bool) (int64, error) { if !r.isSynced() { return 0, NewError(ErrNotSynced, "Not synced") } @@ -413,7 +417,7 @@ func (h *Handler) New(ctx context.Context, name string, frequency uint64) (stora chunk := h.newMetaChunk(name, currentblock, frequency) - h.chunkStore.Put(chunk) + h.chunkStore.Put(ctx, chunk) log.Debug("new resource", "name", name, "key", nameHash, "startBlock", currentblock, "frequency", frequency) // create the internal index for the resource and populate it with the data of the first version @@ -593,7 +597,7 @@ func (h *Handler) lookup(rsrc *resource, period uint32, version uint32, refresh return nil, NewError(ErrPeriodDepth, fmt.Sprintf("Lookup exceeded max period hops (%d)", maxLookup.Max)) } key := h.resourceHash(period, version, rsrc.nameHash) - chunk, err := h.chunkStore.GetWithTimeout(key, defaultRetrieveTimeout) + chunk, err := h.chunkStore.GetWithTimeout(context.TODO(), key, defaultRetrieveTimeout) if err == nil { if specificversion { return h.updateIndex(rsrc, chunk) @@ -603,7 +607,7 @@ func (h *Handler) lookup(rsrc *resource, period uint32, version uint32, refresh for { newversion := version + 1 key := h.resourceHash(period, newversion, rsrc.nameHash) - newchunk, err := h.chunkStore.GetWithTimeout(key, defaultRetrieveTimeout) + newchunk, err := h.chunkStore.GetWithTimeout(context.TODO(), key, defaultRetrieveTimeout) if err != nil { return h.updateIndex(rsrc, chunk) } @@ -621,8 +625,8 @@ func (h *Handler) lookup(rsrc *resource, period uint32, version uint32, refresh // Retrieves a resource metadata chunk and creates/updates the index entry for it // with the resulting metadata -func (h *Handler) Load(addr storage.Address) (*resource, error) { - chunk, err := h.chunkStore.GetWithTimeout(addr, defaultRetrieveTimeout) +func (h *Handler) Load(ctx context.Context, addr storage.Address) (*resource, error) { + chunk, err := h.chunkStore.GetWithTimeout(ctx, addr, defaultRetrieveTimeout) if err != nil { return nil, NewError(ErrNotFound, err.Error()) } @@ -890,7 +894,7 @@ func (h *Handler) update(ctx context.Context, name string, data []byte, multihas chunk := newUpdateChunk(key, signature, nextperiod, version, name, data, datalength) // send the chunk - h.chunkStore.Put(chunk) + h.chunkStore.Put(ctx, chunk) log.Trace("resource update", "name", name, "key", key, "currentblock", currentblock, "lastperiod", nextperiod, "version", version, "data", chunk.SData, "multihash", multihash) // update our resources map entry and return the new key diff --git a/swarm/storage/mru/resource_test.go b/swarm/storage/mru/resource_test.go index aa1860359..48387d981 100644 --- a/swarm/storage/mru/resource_test.go +++ b/swarm/storage/mru/resource_test.go @@ -182,7 +182,7 @@ func TestHandler(t *testing.T) { t.Fatal(err) } - chunk, err := rh.chunkStore.Get(storage.Address(rootChunkKey)) + chunk, err := rh.chunkStore.Get(context.TODO(), storage.Address(rootChunkKey)) if err != nil { t.Fatal(err) } else if len(chunk.SData) < 16 { @@ -256,7 +256,7 @@ func TestHandler(t *testing.T) { if err != nil { t.Fatal(err) } - rsrc2, err := rh2.Load(rootChunkKey) + rsrc2, err := rh2.Load(context.TODO(), rootChunkKey) _, err = rh2.LookupLatest(ctx, nameHash, true, nil) if err != nil { t.Fatal(err) @@ -754,7 +754,7 @@ func newTestSigner() (*GenericSigner, error) { } func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { - chunk, err := rh.chunkStore.Get(addr) + chunk, err := rh.chunkStore.Get(context.TODO(), addr) if err != nil { return nil, err } diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 6a205cfa4..96a7e51f7 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -17,9 +17,12 @@ package storage import ( + "context" "time" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" ) var ( @@ -43,10 +46,10 @@ var ( // access by calling network is blocking with a timeout type NetStore struct { localStore *LocalStore - retrieve func(chunk *Chunk) error + retrieve func(ctx context.Context, chunk *Chunk) error } -func NewNetStore(localStore *LocalStore, retrieve func(chunk *Chunk) error) *NetStore { +func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore { return &NetStore{localStore, retrieve} } @@ -56,7 +59,14 @@ func NewNetStore(localStore *LocalStore, retrieve func(chunk *Chunk) error) *Net // Get uses get method to retrieve request, but retries if the // ErrChunkNotFound is returned by get, until the netStoreRetryTimeout // is reached. -func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { +func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { + + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "netstore.get.global") + defer sp.Finish() + timer := time.NewTimer(netStoreRetryTimeout) defer timer.Stop() @@ -84,7 +94,7 @@ func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { defer limiter.Stop() for { - chunk, err := ns.get(addr, 0) + chunk, err := ns.get(ctx, addr, 0) if err != ErrChunkNotFound { // break retry only if the error is nil // or other error then ErrChunkNotFound @@ -122,16 +132,23 @@ func (ns *NetStore) Get(addr Address) (chunk *Chunk, err error) { } // GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter -func (ns *NetStore) GetWithTimeout(addr Address, timeout time.Duration) (chunk *Chunk, err error) { - return ns.get(addr, timeout) +func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { + return ns.get(ctx, addr, timeout) } -func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err error) { +func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { if timeout == 0 { timeout = searchTimeout } + + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + "netstore.get") + defer sp.Finish() + if ns.retrieve == nil { - chunk, err = ns.localStore.Get(addr) + chunk, err = ns.localStore.Get(ctx, addr) if err == nil { return chunk, nil } @@ -140,14 +157,14 @@ func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err } } else { var created bool - chunk, created = ns.localStore.GetOrCreateRequest(addr) + chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr) if chunk.ReqC == nil { return chunk, nil } if created { - err := ns.retrieve(chunk) + err := ns.retrieve(ctx, chunk) if err != nil { // mark chunk request as failed so that we can retry it later chunk.SetErrored(ErrChunkUnavailable) @@ -171,8 +188,8 @@ func (ns *NetStore) get(addr Address, timeout time.Duration) (chunk *Chunk, err } // Put is the entrypoint for local store requests coming from storeLoop -func (ns *NetStore) Put(chunk *Chunk) { - ns.localStore.Put(chunk) +func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) { + ns.localStore.Put(ctx, chunk) } // Close chunk store diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 432a799d8..7babbf5e0 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -17,6 +17,7 @@ package storage import ( + "context" "encoding/hex" "errors" "io/ioutil" @@ -46,7 +47,7 @@ func newDummyChunk(addr Address) *Chunk { return chunk } -func (m *mockRetrieve) retrieve(chunk *Chunk) error { +func (m *mockRetrieve) retrieve(ctx context.Context, chunk *Chunk) error { hkey := hex.EncodeToString(chunk.Addr) m.requests[hkey] += 1 @@ -100,7 +101,7 @@ func TestNetstoreFailedRequest(t *testing.T) { // } // second call - _, err = netStore.Get(key) + _, err = netStore.Get(context.TODO(), key) if got := r.requests[hex.EncodeToString(key)]; got != 2 { t.Fatalf("expected to have called retrieve two times, but got: %v", got) } @@ -109,7 +110,7 @@ func TestNetstoreFailedRequest(t *testing.T) { } // third call - chunk, err := netStore.Get(key) + chunk, err := netStore.Get(context.TODO(), key) if got := r.requests[hex.EncodeToString(key)]; got != 3 { t.Fatalf("expected to have called retrieve three times, but got: %v", got) } diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 6643e989a..2923c81c5 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -287,7 +287,7 @@ func (pc *PyramidChunker) processor(id int64) { func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { log.Debug("pyramid.chunker: processChunk()", "id", id) - ref, err := pc.putter.Put(job.chunk) + ref, err := pc.putter.Put(context.TODO(), job.chunk) if err != nil { pc.errC <- err } @@ -302,7 +302,7 @@ func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { func (pc *PyramidChunker) loadTree() error { log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunkData, err := pc.getter.Get(Reference(pc.key)) + chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key)) if err != nil { return errLoadingTreeRootChunk } @@ -355,7 +355,7 @@ func (pc *PyramidChunker) loadTree() error { branchCount = int64(len(ent.chunk)-8) / pc.hashSize for i := int64(0); i < branchCount; i++ { key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] - newChunkData, err := pc.getter.Get(Reference(key)) + newChunkData, err := pc.getter.Get(context.TODO(), Reference(key)) if err != nil { return errLoadingTreeChunk } @@ -417,7 +417,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] var err error - unfinishedChunkData, err = pc.getter.Get(lastKey) + unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey) if err != nil { pc.errC <- err } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 32880ead7..3114ef576 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -250,7 +250,8 @@ func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) { // Size, Seek, Read, ReadAt type LazySectionReader interface { - Size(chan bool) (int64, error) + Context() context.Context + Size(context.Context, chan bool) (int64, error) io.Seeker io.Reader io.ReaderAt @@ -260,10 +261,14 @@ type LazyTestSectionReader struct { *io.SectionReader } -func (r *LazyTestSectionReader) Size(chan bool) (int64, error) { +func (r *LazyTestSectionReader) Size(context.Context, chan bool) (int64, error) { return r.SectionReader.Size(), nil } +func (r *LazyTestSectionReader) Context() context.Context { + return context.TODO() +} + type StoreParams struct { Hash SwarmHasher `toml:"-"` DbCapacity uint64 @@ -298,7 +303,7 @@ type Reference []byte // Putter is responsible to store data and create a reference for it type Putter interface { - Put(ChunkData) (Reference, error) + Put(context.Context, ChunkData) (Reference, error) // RefSize returns the length of the Reference created by this Putter RefSize() int64 // Close is to indicate that no more chunk data will be Put on this Putter @@ -309,7 +314,7 @@ type Putter interface { // Getter is an interface to retrieve a chunk's data by its reference type Getter interface { - Get(Reference) (ChunkData, error) + Get(context.Context, Reference) (ChunkData, error) } // NOTE: this returns invalid data if chunk is encrypted |