diff options
Diffstat (limited to 'swarm/storage/mru/handler.go')
-rw-r--r-- | swarm/storage/mru/handler.go | 514 |
1 files changed, 514 insertions, 0 deletions
diff --git a/swarm/storage/mru/handler.go b/swarm/storage/mru/handler.go new file mode 100644 index 000000000..188b986b8 --- /dev/null +++ b/swarm/storage/mru/handler.go @@ -0,0 +1,514 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Handler is the API for Mutable Resources +// It enables creating, updating, syncing and retrieving resources and their update data +package mru + +import ( + "bytes" + "context" + "fmt" + "sync" + "time" + "unsafe" + + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +const chunkSize = 4096 // temporary until we implement FileStore in the resourcehandler + +type Handler struct { + chunkStore *storage.NetStore + HashSize int + resources map[uint64]*resource + resourceLock sync.RWMutex + storeTimeout time.Duration + queryMaxPeriods uint32 +} + +// HandlerParams pass parameters to the Handler constructor NewHandler +// Signer and TimestampProvider are mandatory parameters +type HandlerParams struct { + QueryMaxPeriods uint32 +} + +// hashPool contains a pool of ready hashers +var hashPool sync.Pool +var minimumChunkLength int + +// init initializes the package and hashPool +func init() { + hashPool = sync.Pool{ + New: func() interface{} { + return storage.MakeHashFunc(resourceHashAlgorithm)() + }, + } + if minimumMetadataLength < minimumUpdateDataLength { + minimumChunkLength = minimumMetadataLength + } else { + minimumChunkLength = minimumUpdateDataLength + } +} + +// NewHandler creates a new Mutable Resource API +func NewHandler(params *HandlerParams) (*Handler, error) { + + rh := &Handler{ + resources: make(map[uint64]*resource), + storeTimeout: defaultStoreTimeout, + queryMaxPeriods: params.QueryMaxPeriods, + } + + for i := 0; i < hasherCount; i++ { + hashfunc := storage.MakeHashFunc(resourceHashAlgorithm)() + if rh.HashSize == 0 { + rh.HashSize = hashfunc.Size() + } + hashPool.Put(hashfunc) + } + + return rh, nil +} + +// SetStore sets the store backend for the Mutable Resource API +func (h *Handler) SetStore(store *storage.NetStore) { + h.chunkStore = store +} + +// Validate is a chunk validation method +// If it looks like a resource update, the chunk address is checked against the ownerAddr of the update's signature +// It implements the storage.ChunkValidator interface +func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool { + + dataLength := len(data) + if dataLength < minimumChunkLength { + return false + } + + //metadata chunks have the first two bytes set to zero + if data[0] == 0 && data[1] == 0 && dataLength >= minimumMetadataLength { + //metadata chunk + rootAddr, _ := metadataHash(data) + valid := bytes.Equal(chunkAddr, rootAddr) + if !valid { + log.Debug(fmt.Sprintf("Invalid root metadata chunk with address: %s", chunkAddr.Hex())) + } + return valid + } + + // if it is not a metadata chunk, check if it is a properly formatted update chunk with + // valid signature and proof of ownership of the resource it is trying + // to update + + // First, deserialize the chunk + var r SignedResourceUpdate + if err := r.fromChunk(chunkAddr, data); err != nil { + log.Debug("Invalid resource chunk with address %s: %s ", chunkAddr.Hex(), err.Error()) + return false + } + + // check that the lookup information contained in the chunk matches the updateAddr (chunk search key) + // that was used to retrieve this chunk + // if this validation fails, someone forged a chunk. + if !bytes.Equal(chunkAddr, r.updateHeader.UpdateAddr()) { + log.Debug("period,version,rootAddr contained in update chunk do not match updateAddr %s", chunkAddr.Hex()) + return false + } + + // Verify signatures and that the signer actually owns the resource + // If it fails, it means either the signature is not valid, data is corrupted + // or someone is trying to update someone else's resource. + if err := r.Verify(); err != nil { + log.Debug("Invalid signature: %v", err) + return false + } + + return true +} + +// GetContent retrieves the data payload of the last synced update of the Mutable Resource +func (h *Handler) GetContent(rootAddr storage.Address) (storage.Address, []byte, error) { + rsrc := h.get(rootAddr) + if rsrc == nil || !rsrc.isSynced() { + return nil, nil, NewError(ErrNotFound, " does not exist or is not synced") + } + return rsrc.lastKey, rsrc.data, nil +} + +// GetLastPeriod retrieves the period of the last synced update of the Mutable Resource +func (h *Handler) GetLastPeriod(rootAddr storage.Address) (uint32, error) { + rsrc := h.get(rootAddr) + if rsrc == nil { + return 0, NewError(ErrNotFound, " does not exist") + } else if !rsrc.isSynced() { + return 0, NewError(ErrNotSynced, " is not synced") + } + return rsrc.period, nil +} + +// GetVersion retrieves the period of the last synced update of the Mutable Resource +func (h *Handler) GetVersion(rootAddr storage.Address) (uint32, error) { + rsrc := h.get(rootAddr) + if rsrc == nil { + return 0, NewError(ErrNotFound, " does not exist") + } else if !rsrc.isSynced() { + return 0, NewError(ErrNotSynced, " is not synced") + } + return rsrc.version, nil +} + +// \TODO should be hashsize * branches from the chosen chunker, implement with FileStore +func (h *Handler) chunkSize() int64 { + return chunkSize +} + +// New creates a new metadata chunk out of the request passed in. +func (h *Handler) New(ctx context.Context, request *Request) error { + + // frequency 0 is invalid + if request.metadata.Frequency == 0 { + return NewError(ErrInvalidValue, "frequency cannot be 0 when creating a resource") + } + + // make sure owner is set to something + if request.metadata.Owner == zeroAddr { + return NewError(ErrInvalidValue, "ownerAddr must be set to create a new metadata chunk") + } + + // create the meta chunk and store it in swarm + chunk, metaHash, err := request.metadata.newChunk() + if err != nil { + return err + } + if request.metaHash != nil && !bytes.Equal(request.metaHash, metaHash) || + request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Addr) { + return NewError(ErrInvalidValue, "metaHash in UpdateRequest does not match actual metadata") + } + + request.metaHash = metaHash + request.rootAddr = chunk.Addr + + h.chunkStore.Put(ctx, chunk) + log.Debug("new resource", "name", request.metadata.Name, "startTime", request.metadata.StartTime, "frequency", request.metadata.Frequency, "owner", request.metadata.Owner) + + // create the internal index for the resource and populate it with its metadata + rsrc := &resource{ + resourceUpdate: resourceUpdate{ + updateHeader: updateHeader{ + UpdateLookup: UpdateLookup{ + rootAddr: chunk.Addr, + }, + }, + }, + ResourceMetadata: request.metadata, + updated: time.Now(), + } + h.set(chunk.Addr, rsrc) + + return nil +} + +// NewUpdateRequest prepares an UpdateRequest structure with all the necessary information to +// just add the desired data and sign it. +// The resulting structure can then be signed and passed to Handler.Update to be verified and sent +func (h *Handler) NewUpdateRequest(ctx context.Context, rootAddr storage.Address) (updateRequest *Request, err error) { + + if rootAddr == nil { + return nil, NewError(ErrInvalidValue, "rootAddr cannot be nil") + } + + // Make sure we have a cache of the metadata chunk + rsrc, err := h.Load(ctx, rootAddr) + if err != nil { + return nil, err + } + + now := TimestampProvider.Now() + + updateRequest = new(Request) + updateRequest.period, err = getNextPeriod(rsrc.StartTime.Time, now.Time, rsrc.Frequency) + if err != nil { + return nil, err + } + + if _, err = h.lookup(rsrc, LookupLatestVersionInPeriod(rsrc.rootAddr, updateRequest.period)); err != nil { + if err.(*Error).code != ErrNotFound { + return nil, err + } + // not finding updates means that there is a network error + // or that the resource really does not have updates in this period. + } + + updateRequest.multihash = rsrc.multihash + updateRequest.rootAddr = rsrc.rootAddr + updateRequest.metaHash = rsrc.metaHash + updateRequest.metadata = rsrc.ResourceMetadata + + // if we already have an update for this period then increment version + // resource object MUST be in sync for version to be correct, but we checked this earlier in the method already + if h.hasUpdate(rootAddr, updateRequest.period) { + updateRequest.version = rsrc.version + 1 + } else { + updateRequest.version = 1 + } + + return updateRequest, nil +} + +// Lookup retrieves a specific or latest version of the resource update with metadata chunk at params.Root +// Lookup works differently depending on the configuration of `LookupParams` +// See the `LookupParams` documentation and helper functions: +// `LookupLatest`, `LookupLatestVersionInPeriod` and `LookupVersion` +// When looking for the latest update, it starts at the next period after the current time. +// upon failure tries the corresponding keys of each previous period until one is found +// (or startTime is reached, in which case there are no updates). +func (h *Handler) Lookup(ctx context.Context, params *LookupParams) (*resource, error) { + + rsrc := h.get(params.rootAddr) + if rsrc == nil { + return nil, NewError(ErrNothingToReturn, "resource not loaded") + } + return h.lookup(rsrc, params) +} + +// LookupPrevious returns the resource before the one currently loaded in the resource cache +// This is useful where resource updates are used incrementally in contrast to +// merely replacing content. +// Requires a cached resource object to determine the current state of the resource. +func (h *Handler) LookupPrevious(ctx context.Context, params *LookupParams) (*resource, error) { + rsrc := h.get(params.rootAddr) + if rsrc == nil { + return nil, NewError(ErrNothingToReturn, "resource not loaded") + } + if !rsrc.isSynced() { + return nil, NewError(ErrNotSynced, "LookupPrevious requires synced resource.") + } else if rsrc.period == 0 { + return nil, NewError(ErrNothingToReturn, " not found") + } + var version, period uint32 + if rsrc.version > 1 { + version = rsrc.version - 1 + period = rsrc.period + } else if rsrc.period == 1 { + return nil, NewError(ErrNothingToReturn, "Current update is the oldest") + } else { + version = 0 + period = rsrc.period - 1 + } + return h.lookup(rsrc, NewLookupParams(rsrc.rootAddr, period, version, params.Limit)) +} + +// base code for public lookup methods +func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error) { + + lp := *params + // we can't look for anything without a store + if h.chunkStore == nil { + return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups") + } + + var specificperiod bool + if lp.period > 0 { + specificperiod = true + } else { + // get the current time and the next period + now := TimestampProvider.Now() + + var period uint32 + period, err := getNextPeriod(rsrc.StartTime.Time, now.Time, rsrc.Frequency) + if err != nil { + return nil, err + } + lp.period = period + } + + // start from the last possible period, and iterate previous ones + // (unless we want a specific period only) until we find a match. + // If we hit startTime we're out of options + var specificversion bool + if lp.version > 0 { + specificversion = true + } else { + lp.version = 1 + } + + var hops uint32 + if lp.Limit == 0 { + lp.Limit = h.queryMaxPeriods + } + log.Trace("resource lookup", "period", lp.period, "version", lp.version, "limit", lp.Limit) + for lp.period > 0 { + if lp.Limit != 0 && hops > lp.Limit { + return nil, NewErrorf(ErrPeriodDepth, "Lookup exceeded max period hops (%d)", lp.Limit) + } + updateAddr := lp.UpdateAddr() + chunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + if err == nil { + if specificversion { + return h.updateIndex(rsrc, chunk) + } + // check if we have versions > 1. If a version fails, the previous version is used and returned. + log.Trace("rsrc update version 1 found, checking for version updates", "period", lp.period, "updateAddr", updateAddr) + for { + newversion := lp.version + 1 + updateAddr := lp.UpdateAddr() + newchunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + if err != nil { + return h.updateIndex(rsrc, chunk) + } + chunk = newchunk + lp.version = newversion + log.Trace("version update found, checking next", "version", lp.version, "period", lp.period, "updateAddr", updateAddr) + } + } + if specificperiod { + break + } + log.Trace("rsrc update not found, checking previous period", "period", lp.period, "updateAddr", updateAddr) + lp.period-- + hops++ + } + return nil, NewError(ErrNotFound, "no updates found") +} + +// Load retrieves the Mutable Resource metadata chunk stored at rootAddr +// Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents +func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource, error) { + chunk, err := h.chunkStore.GetWithTimeout(ctx, rootAddr, defaultRetrieveTimeout) + if err != nil { + return nil, NewError(ErrNotFound, err.Error()) + } + + // create the index entry + rsrc := &resource{} + + if err := rsrc.ResourceMetadata.binaryGet(chunk.SData); err != nil { // Will fail if this is not really a metadata chunk + return nil, err + } + + rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.SData) + if !bytes.Equal(rsrc.rootAddr, rootAddr) { + return nil, NewError(ErrCorruptData, "Corrupt metadata chunk") + } + h.set(rootAddr, rsrc) + log.Trace("resource index load", "rootkey", rootAddr, "name", rsrc.ResourceMetadata.Name, "starttime", rsrc.ResourceMetadata.StartTime, "frequency", rsrc.ResourceMetadata.Frequency) + return rsrc, nil +} + +// update mutable resource index map with specified content +func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, error) { + + // retrieve metadata from chunk data and check that it matches this mutable resource + var r SignedResourceUpdate + if err := r.fromChunk(chunk.Addr, chunk.SData); err != nil { + return nil, err + } + log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Addr, "period", r.period, "version", r.version) + + // update our rsrcs entry map + rsrc.lastKey = chunk.Addr + rsrc.period = r.period + rsrc.version = r.version + rsrc.updated = time.Now() + rsrc.data = make([]byte, len(r.data)) + rsrc.multihash = r.multihash + copy(rsrc.data, r.data) + rsrc.Reader = bytes.NewReader(rsrc.data) + log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Addr, "period", rsrc.period, "version", rsrc.version) + h.set(chunk.Addr, rsrc) + return rsrc, nil +} + +// Update adds an actual data update +// Uses the Mutable Resource metadata currently loaded in the resources map entry. +// It is the caller's responsibility to make sure that this data is not stale. +// Note that a Mutable Resource update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature. An error will be returned if the total length of the chunk payload will exceed this limit. +// Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update +// on the network. +func (h *Handler) Update(ctx context.Context, r *SignedResourceUpdate) (storage.Address, error) { + return h.update(ctx, r) +} + +// create and commit an update +func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAddr storage.Address, err error) { + + // we can't update anything without a store + if h.chunkStore == nil { + return nil, NewError(ErrInit, "Call Handler.SetStore() before updating") + } + + rsrc := h.get(r.rootAddr) + if rsrc != nil && rsrc.period != 0 && rsrc.version != 0 && // This is the only cheap check we can do for sure + rsrc.period == r.period && rsrc.version >= r.version { // without having to lookup update chunks + + return nil, NewError(ErrInvalidValue, "A former update in this period is already known to exist") + } + + chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big + if err != nil { + return nil, err + } + + // send the chunk + h.chunkStore.Put(ctx, chunk) + log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.SData, "multihash", r.multihash) + + // update our resources map entry if the new update is older than the one we have, if we have it. + if rsrc != nil && r.period > rsrc.period || (rsrc.period == r.period && r.version > rsrc.version) { + rsrc.period = r.period + rsrc.version = r.version + rsrc.data = make([]byte, len(r.data)) + rsrc.updated = time.Now() + rsrc.lastKey = r.updateAddr + rsrc.multihash = r.multihash + copy(rsrc.data, r.data) + rsrc.Reader = bytes.NewReader(rsrc.data) + } + return r.updateAddr, nil +} + +// Retrieves the resource index value for the given nameHash +func (h *Handler) get(rootAddr storage.Address) *resource { + if len(rootAddr) < storage.KeyLength { + log.Warn("Handler.get with invalid rootAddr") + return nil + } + hashKey := *(*uint64)(unsafe.Pointer(&rootAddr[0])) + h.resourceLock.RLock() + defer h.resourceLock.RUnlock() + rsrc := h.resources[hashKey] + return rsrc +} + +// Sets the resource index value for the given nameHash +func (h *Handler) set(rootAddr storage.Address, rsrc *resource) { + if len(rootAddr) < storage.KeyLength { + log.Warn("Handler.set with invalid rootAddr") + return + } + hashKey := *(*uint64)(unsafe.Pointer(&rootAddr[0])) + h.resourceLock.Lock() + defer h.resourceLock.Unlock() + h.resources[hashKey] = rsrc +} + +// Checks if we already have an update on this resource, according to the value in the current state of the resource index +func (h *Handler) hasUpdate(rootAddr storage.Address, period uint32) bool { + rsrc := h.get(rootAddr) + return rsrc != nil && rsrc.period == period +} |