diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm/storage/mru | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/storage/mru')
-rw-r--r-- | swarm/storage/mru/handler.go | 47 | ||||
-rw-r--r-- | swarm/storage/mru/lookup.go | 6 | ||||
-rw-r--r-- | swarm/storage/mru/metadata.go | 6 | ||||
-rw-r--r-- | swarm/storage/mru/request.go | 2 | ||||
-rw-r--r-- | swarm/storage/mru/resource_test.go | 46 | ||||
-rw-r--r-- | swarm/storage/mru/signedupdate.go | 9 | ||||
-rw-r--r-- | swarm/storage/mru/testutil.go | 21 | ||||
-rw-r--r-- | swarm/storage/mru/updateheader.go | 4 |
8 files changed, 82 insertions, 59 deletions
diff --git a/swarm/storage/mru/handler.go b/swarm/storage/mru/handler.go index 57561fd14..18c667f14 100644 --- a/swarm/storage/mru/handler.go +++ b/swarm/storage/mru/handler.go @@ -187,12 +187,12 @@ func (h *Handler) New(ctx context.Context, request *Request) error { return err } if request.metaHash != nil && !bytes.Equal(request.metaHash, metaHash) || - request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Addr) { + request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Address()) { return NewError(ErrInvalidValue, "metaHash in UpdateRequest does not match actual metadata") } request.metaHash = metaHash - request.rootAddr = chunk.Addr + request.rootAddr = chunk.Address() h.chunkStore.Put(ctx, chunk) log.Debug("new resource", "name", request.metadata.Name, "startTime", request.metadata.StartTime, "frequency", request.metadata.Frequency, "owner", request.metadata.Owner) @@ -202,14 +202,14 @@ func (h *Handler) New(ctx context.Context, request *Request) error { resourceUpdate: resourceUpdate{ updateHeader: updateHeader{ UpdateLookup: UpdateLookup{ - rootAddr: chunk.Addr, + rootAddr: chunk.Address(), }, }, }, ResourceMetadata: request.metadata, updated: time.Now(), } - h.set(chunk.Addr, rsrc) + h.set(chunk.Address(), rsrc) return nil } @@ -348,7 +348,11 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error return nil, NewErrorf(ErrPeriodDepth, "Lookup exceeded max period hops (%d)", lp.Limit) } updateAddr := lp.UpdateAddr() - chunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) + defer cancel() + + chunk, err := h.chunkStore.Get(ctx, updateAddr) if err == nil { if specificversion { return h.updateIndex(rsrc, chunk) @@ -358,7 +362,11 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error for { newversion := lp.version + 1 updateAddr := lp.UpdateAddr() - newchunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) + defer cancel() + + newchunk, err := h.chunkStore.Get(ctx, updateAddr) if err != nil { return h.updateIndex(rsrc, chunk) } @@ -380,7 +388,10 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error // Load retrieves the Mutable Resource metadata chunk stored at rootAddr // Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource, error) { - chunk, err := h.chunkStore.GetWithTimeout(ctx, rootAddr, defaultRetrieveTimeout) + //TODO: Maybe add timeout to context, defaultRetrieveTimeout? + ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) + defer cancel() + chunk, err := h.chunkStore.Get(ctx, rootAddr) if err != nil { return nil, NewError(ErrNotFound, err.Error()) } @@ -388,11 +399,11 @@ func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource // create the index entry rsrc := &resource{} - if err := rsrc.ResourceMetadata.binaryGet(chunk.SData); err != nil { // Will fail if this is not really a metadata chunk + if err := rsrc.ResourceMetadata.binaryGet(chunk.Data()); err != nil { // Will fail if this is not really a metadata chunk return nil, err } - rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.SData) + rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.Data()) if !bytes.Equal(rsrc.rootAddr, rootAddr) { return nil, NewError(ErrCorruptData, "Corrupt metadata chunk") } @@ -402,17 +413,17 @@ func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource } // update mutable resource index map with specified content -func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, error) { +func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, error) { // retrieve metadata from chunk data and check that it matches this mutable resource var r SignedResourceUpdate - if err := r.fromChunk(chunk.Addr, chunk.SData); err != nil { + if err := r.fromChunk(chunk.Address(), chunk.Data()); err != nil { return nil, err } - log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Addr, "period", r.period, "version", r.version) + log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Address(), "period", r.period, "version", r.version) // update our rsrcs entry map - rsrc.lastKey = chunk.Addr + rsrc.lastKey = chunk.Address() rsrc.period = r.period rsrc.version = r.version rsrc.updated = time.Now() @@ -420,8 +431,8 @@ func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, rsrc.multihash = r.multihash copy(rsrc.data, r.data) rsrc.Reader = bytes.NewReader(rsrc.data) - log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Addr, "period", rsrc.period, "version", rsrc.version) - h.set(chunk.Addr, rsrc) + log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Address(), "period", rsrc.period, "version", rsrc.version) + h.set(chunk.Address(), rsrc) return rsrc, nil } @@ -457,7 +468,7 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // send the chunk h.chunkStore.Put(ctx, chunk) - log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.SData, "multihash", r.multihash) + log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.Data(), "multihash", r.multihash) // update our resources map entry if the new update is older than the one we have, if we have it. if rsrc != nil && (r.period > rsrc.period || (rsrc.period == r.period && r.version > rsrc.version)) { @@ -475,7 +486,7 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // Retrieves the resource index value for the given nameHash func (h *Handler) get(rootAddr storage.Address) *resource { - if len(rootAddr) < storage.KeyLength { + if len(rootAddr) < storage.AddressLength { log.Warn("Handler.get with invalid rootAddr") return nil } @@ -488,7 +499,7 @@ func (h *Handler) get(rootAddr storage.Address) *resource { // Sets the resource index value for the given nameHash func (h *Handler) set(rootAddr storage.Address, rsrc *resource) { - if len(rootAddr) < storage.KeyLength { + if len(rootAddr) < storage.AddressLength { log.Warn("Handler.set with invalid rootAddr") return } diff --git a/swarm/storage/mru/lookup.go b/swarm/storage/mru/lookup.go index eb28336e1..b52cd5b4f 100644 --- a/swarm/storage/mru/lookup.go +++ b/swarm/storage/mru/lookup.go @@ -72,7 +72,7 @@ type UpdateLookup struct { // 4 bytes period // 4 bytes version // storage.Keylength for rootAddr -const updateLookupLength = 4 + 4 + storage.KeyLength +const updateLookupLength = 4 + 4 + storage.AddressLength // UpdateAddr calculates the resource update chunk address corresponding to this lookup key func (u *UpdateLookup) UpdateAddr() (updateAddr storage.Address) { @@ -90,7 +90,7 @@ func (u *UpdateLookup) binaryPut(serializedData []byte) error { if len(serializedData) != updateLookupLength { return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize UpdateLookup. Expected %d, got %d", updateLookupLength, len(serializedData)) } - if len(u.rootAddr) != storage.KeyLength { + if len(u.rootAddr) != storage.AddressLength { return NewError(ErrInvalidValue, "UpdateLookup.binaryPut called without rootAddr set") } binary.LittleEndian.PutUint32(serializedData[:4], u.period) @@ -111,7 +111,7 @@ func (u *UpdateLookup) binaryGet(serializedData []byte) error { } u.period = binary.LittleEndian.Uint32(serializedData[:4]) u.version = binary.LittleEndian.Uint32(serializedData[4:8]) - u.rootAddr = storage.Address(make([]byte, storage.KeyLength)) + u.rootAddr = storage.Address(make([]byte, storage.AddressLength)) copy(u.rootAddr[:], serializedData[8:]) return nil } diff --git a/swarm/storage/mru/metadata.go b/swarm/storage/mru/metadata.go index 0ab0ed1d9..509114895 100644 --- a/swarm/storage/mru/metadata.go +++ b/swarm/storage/mru/metadata.go @@ -142,7 +142,7 @@ func (r *ResourceMetadata) serializeAndHash() (rootAddr, metaHash []byte, chunkD } // creates a metadata chunk out of a resourceMetadata structure -func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []byte, err error) { +func (metadata *ResourceMetadata) newChunk() (chunk storage.Chunk, metaHash []byte, err error) { // the metadata chunk contains a timestamp of when the resource starts to be valid // and also how frequently it is expected to be updated // from this we know at what time we should look for updates, and how often @@ -157,9 +157,7 @@ func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []b } // make the chunk and send it to swarm - chunk = storage.NewChunk(rootAddr, nil) - chunk.SData = chunkData - chunk.Size = int64(len(chunkData)) + chunk = storage.NewChunk(rootAddr, chunkData) return chunk, metaHash, nil } diff --git a/swarm/storage/mru/request.go b/swarm/storage/mru/request.go index dd71f855d..af2ccf5c7 100644 --- a/swarm/storage/mru/request.go +++ b/swarm/storage/mru/request.go @@ -182,7 +182,7 @@ func (r *Request) fromJSON(j *updateRequestJSON) error { var declaredRootAddr storage.Address var declaredMetaHash []byte - declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.KeyLength, "rootAddr") + declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.AddressLength, "rootAddr") if err != nil { return err } diff --git a/swarm/storage/mru/resource_test.go b/swarm/storage/mru/resource_test.go index 76d7c58a1..0fb465bb0 100644 --- a/swarm/storage/mru/resource_test.go +++ b/swarm/storage/mru/resource_test.go @@ -87,8 +87,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { resourceUpdate: resourceUpdate{ updateHeader: updateHeader{ UpdateLookup: UpdateLookup{ - - rootAddr: make([]byte, 79), // put the wrong length, should be storage.KeyLength + rootAddr: make([]byte, 79), // put the wrong length, should be storage.AddressLength }, metaHash: nil, multihash: false, @@ -99,8 +98,8 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { if err == nil { t.Fatal("Expected newUpdateChunk to fail when rootAddr or metaHash have the wrong length") } - r.rootAddr = make([]byte, storage.KeyLength) - r.metaHash = make([]byte, storage.KeyLength) + r.rootAddr = make([]byte, storage.AddressLength) + r.metaHash = make([]byte, storage.AddressLength) _, err = r.toChunk() if err == nil { t.Fatal("Expected newUpdateChunk to fail when there is no data") @@ -197,7 +196,7 @@ func TestReverse(t *testing.T) { // check that we can recover the owner account from the update chunk's signature var checkUpdate SignedResourceUpdate - if err := checkUpdate.fromChunk(chunk.Addr, chunk.SData); err != nil { + if err := checkUpdate.fromChunk(chunk.Address(), chunk.Data()); err != nil { t.Fatal(err) } checkdigest, err := checkUpdate.GetDigest() @@ -215,8 +214,8 @@ func TestReverse(t *testing.T) { t.Fatalf("addresses dont match: %x != %x", originaladdress, recoveredaddress) } - if !bytes.Equal(key[:], chunk.Addr[:]) { - t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Addr) + if !bytes.Equal(key[:], chunk.Address()[:]) { + t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Address()) } if period != checkUpdate.period { t.Fatalf("Expected period '%d', was '%d'", period, checkUpdate.period) @@ -270,16 +269,16 @@ func TestResourceHandler(t *testing.T) { t.Fatal(err) } - chunk, err := rh.chunkStore.Get(context.TODO(), storage.Address(request.rootAddr)) + chunk, err := rh.chunkStore.Get(ctx, storage.Address(request.rootAddr)) if err != nil { t.Fatal(err) - } else if len(chunk.SData) < 16 { - t.Fatalf("chunk data must be minimum 16 bytes, is %d", len(chunk.SData)) + } else if len(chunk.Data()) < 16 { + t.Fatalf("chunk data must be minimum 16 bytes, is %d", len(chunk.Data())) } var recoveredMetadata ResourceMetadata - recoveredMetadata.binaryGet(chunk.SData) + recoveredMetadata.binaryGet(chunk.Data()) if err != nil { t.Fatal(err) } @@ -704,7 +703,7 @@ func TestValidator(t *testing.T) { if err != nil { t.Fatal(err) } - if !rh.Validate(chunk.Addr, chunk.SData) { + if !rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator fail on update chunk") } @@ -724,7 +723,7 @@ func TestValidator(t *testing.T) { t.Fatal(err) } - if rh.Validate(chunk.Addr, chunk.SData) { + if rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator did not fail on update chunk with false address") } @@ -742,7 +741,7 @@ func TestValidator(t *testing.T) { t.Fatal(err) } - if !rh.Validate(chunk.Addr, chunk.SData) { + if !rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator fail on metadata chunk") } } @@ -783,8 +782,7 @@ func TestValidatorInStore(t *testing.T) { // create content addressed chunks, one good, one faulty chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2) goodChunk := chunks[0] - badChunk := chunks[1] - badChunk.SData = goodChunk.SData + badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data()) metadata := &ResourceMetadata{ StartTime: startTime, @@ -801,7 +799,7 @@ func TestValidatorInStore(t *testing.T) { updateLookup := UpdateLookup{ period: 42, version: 1, - rootAddr: rootChunk.Addr, + rootAddr: rootChunk.Address(), } updateAddr := updateLookup.UpdateAddr() @@ -826,16 +824,16 @@ func TestValidatorInStore(t *testing.T) { } // put the chunks in the store and check their error status - storage.PutChunks(store, goodChunk) - if goodChunk.GetErrored() == nil { + err = store.Put(context.Background(), goodChunk) + if err == nil { t.Fatal("expected error on good content address chunk with resource validator only, but got nil") } - storage.PutChunks(store, badChunk) - if badChunk.GetErrored() == nil { + err = store.Put(context.Background(), badChunk) + if err == nil { t.Fatal("expected error on bad content address chunk with resource validator only, but got nil") } - storage.PutChunks(store, uglyChunk) - if err := uglyChunk.GetErrored(); err != nil { + err = store.Put(context.Background(), uglyChunk) + if err != nil { t.Fatalf("expected no error on resource update chunk with resource validator only, but got: %s", err) } } @@ -897,7 +895,7 @@ func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { return nil, err } var r SignedResourceUpdate - if err := r.fromChunk(addr, chunk.SData); err != nil { + if err := r.fromChunk(addr, chunk.Data()); err != nil { return nil, err } return r.data, nil diff --git a/swarm/storage/mru/signedupdate.go b/swarm/storage/mru/signedupdate.go index 1c6d02e82..41a5a5e63 100644 --- a/swarm/storage/mru/signedupdate.go +++ b/swarm/storage/mru/signedupdate.go @@ -96,7 +96,7 @@ func (r *SignedResourceUpdate) Sign(signer Signer) error { } // create an update chunk. -func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) { +func (r *SignedResourceUpdate) toChunk() (storage.Chunk, error) { // Check that the update is signed and serialized // For efficiency, data is serialized during signature and cached in @@ -105,14 +105,11 @@ func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) { return nil, NewError(ErrInvalidSignature, "newUpdateChunk called without a valid signature or payload data. Call .Sign() first.") } - chunk := storage.NewChunk(r.updateAddr, nil) resourceUpdateLength := r.resourceUpdate.binaryLength() - chunk.SData = r.binaryData - // signature is the last item in the chunk data - copy(chunk.SData[resourceUpdateLength:], r.signature[:]) + copy(r.binaryData[resourceUpdateLength:], r.signature[:]) - chunk.Size = int64(len(chunk.SData)) + chunk := storage.NewChunk(r.updateAddr, r.binaryData) return chunk, nil } diff --git a/swarm/storage/mru/testutil.go b/swarm/storage/mru/testutil.go index 6efcba9ab..a30baaa1d 100644 --- a/swarm/storage/mru/testutil.go +++ b/swarm/storage/mru/testutil.go @@ -17,8 +17,12 @@ package mru import ( + "context" "fmt" "path/filepath" + "sync" + + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -35,6 +39,17 @@ func (t *TestHandler) Close() { t.chunkStore.Close() } +type mockNetFetcher struct{} + +func (m *mockNetFetcher) Request(ctx context.Context) { +} +func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) { +} + +func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher { + return &mockNetFetcher{} +} + // NewTestHandler creates Handler object to be used for testing purposes. func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) { path := filepath.Join(datadir, testDbDirName) @@ -47,7 +62,11 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) } localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(resourceHashAlgorithm))) localStore.Validators = append(localStore.Validators, rh) - netStore := storage.NewNetStore(localStore, nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, err + } + netStore.NewNetFetcherFunc = newFakeNetFetcher rh.SetStore(netStore) return &TestHandler{rh}, nil } diff --git a/swarm/storage/mru/updateheader.go b/swarm/storage/mru/updateheader.go index 3ac20c189..f0039eaf6 100644 --- a/swarm/storage/mru/updateheader.go +++ b/swarm/storage/mru/updateheader.go @@ -27,7 +27,7 @@ type updateHeader struct { metaHash []byte // SHA3 hash of the metadata chunk (less ownerAddr). Used to prove ownerhsip of the resource. } -const metaHashLength = storage.KeyLength +const metaHashLength = storage.AddressLength // updateLookupLength bytes // 1 byte flags (multihash bool for now) @@ -76,7 +76,7 @@ func (h *updateHeader) binaryGet(serializedData []byte) error { } cursor := updateLookupLength h.metaHash = make([]byte, metaHashLength) - copy(h.metaHash[:storage.KeyLength], serializedData[cursor:cursor+storage.KeyLength]) + copy(h.metaHash[:storage.AddressLength], serializedData[cursor:cursor+storage.AddressLength]) cursor += metaHashLength flags := serializedData[cursor] |