diff options
Diffstat (limited to 'swarm/storage/feed/handler.go')
-rw-r--r-- | swarm/storage/feed/handler.go | 298 |
1 files changed, 0 insertions, 298 deletions
diff --git a/swarm/storage/feed/handler.go b/swarm/storage/feed/handler.go deleted file mode 100644 index 98ed7fa99..000000000 --- a/swarm/storage/feed/handler.go +++ /dev/null @@ -1,298 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Handler is the API for feeds -// It enables creating, updating, syncing and retrieving feed updates and their data -package feed - -import ( - "bytes" - "context" - "fmt" - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/swarm/chunk" - - "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" - - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -type Handler struct { - chunkStore *storage.NetStore - HashSize int - cache map[uint64]*cacheEntry - cacheLock sync.RWMutex -} - -// HandlerParams pass parameters to the Handler constructor NewHandler -// Signer and TimestampProvider are mandatory parameters -type HandlerParams struct { -} - -// hashPool contains a pool of ready hashers -var hashPool sync.Pool - -// init initializes the package and hashPool -func init() { - hashPool = sync.Pool{ - New: func() interface{} { - return storage.MakeHashFunc(feedsHashAlgorithm)() - }, - } -} - -// NewHandler creates a new Swarm feeds API -func NewHandler(params *HandlerParams) *Handler { - fh := &Handler{ - cache: make(map[uint64]*cacheEntry), - } - - for i := 0; i < hasherCount; i++ { - hashfunc := storage.MakeHashFunc(feedsHashAlgorithm)() - if fh.HashSize == 0 { - fh.HashSize = hashfunc.Size() - } - hashPool.Put(hashfunc) - } - - return fh -} - -// SetStore sets the store backend for the Swarm feeds API -func (h *Handler) SetStore(store *storage.NetStore) { - h.chunkStore = store -} - -// Validate is a chunk validation method -// If it looks like a feed update, the chunk address is checked against the userAddr of the update's signature -// It implements the storage.ChunkValidator interface -func (h *Handler) Validate(chunk storage.Chunk) bool { - if len(chunk.Data()) < minimumSignedUpdateLength { - return false - } - - // check if it is a properly formatted update chunk with - // valid signature and proof of ownership of the feed it is trying - // to update - - // First, deserialize the chunk - var r Request - if err := r.fromChunk(chunk); err != nil { - log.Debug("Invalid feed update chunk", "addr", chunk.Address(), "err", err) - return false - } - - // Verify signatures and that the signer actually owns the feed - // If it fails, it means either the signature is not valid, data is corrupted - // or someone is trying to update someone else's feed. - if err := r.Verify(); err != nil { - log.Debug("Invalid feed update signature", "err", err) - return false - } - - return true -} - -// GetContent retrieves the data payload of the last synced update of the feed -func (h *Handler) GetContent(feed *Feed) (storage.Address, []byte, error) { - if feed == nil { - return nil, nil, NewError(ErrInvalidValue, "feed is nil") - } - feedUpdate := h.get(feed) - if feedUpdate == nil { - return nil, nil, NewError(ErrNotFound, "feed update not cached") - } - return feedUpdate.lastKey, feedUpdate.data, nil -} - -// NewRequest prepares a Request structure with all the necessary information to -// just add the desired data and sign it. -// The resulting structure can then be signed and passed to Handler.Update to be verified and sent -func (h *Handler) NewRequest(ctx context.Context, feed *Feed) (request *Request, err error) { - if feed == nil { - return nil, NewError(ErrInvalidValue, "feed cannot be nil") - } - - now := TimestampProvider.Now().Time - request = new(Request) - request.Header.Version = ProtocolVersion - - query := NewQueryLatest(feed, lookup.NoClue) - - feedUpdate, err := h.Lookup(ctx, query) - if err != nil { - if err.(*Error).code != ErrNotFound { - return nil, err - } - // not finding updates means that there is a network error - // or that the feed really does not have updates - } - - request.Feed = *feed - - // if we already have an update, then find next epoch - if feedUpdate != nil { - request.Epoch = lookup.GetNextEpoch(feedUpdate.Epoch, now) - } else { - request.Epoch = lookup.GetFirstEpoch(now) - } - - return request, nil -} - -// Lookup retrieves a specific or latest feed update -// Lookup works differently depending on the configuration of `query` -// See the `query` documentation and helper functions: -// `NewQueryLatest` and `NewQuery` -func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) { - - timeLimit := query.TimeLimit - if timeLimit == 0 { // if time limit is set to zero, the user wants to get the latest update - timeLimit = TimestampProvider.Now().Time - } - - if query.Hint == lookup.NoClue { // try to use our cache - entry := h.get(&query.Feed) - if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints - query.Hint = entry.Epoch - } - } - - // we can't look for anything without a store - if h.chunkStore == nil { - return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups") - } - - var readCount int32 - - // Invoke the lookup engine. - // The callback will be called every time the lookup algorithm needs to guess - requestPtr, err := lookup.Lookup(ctx, timeLimit, query.Hint, func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - atomic.AddInt32(&readCount, 1) - id := ID{ - Feed: query.Feed, - Epoch: epoch, - } - ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) - defer cancel() - - ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, id.Addr()) - if err != nil { - if err == context.DeadlineExceeded { // chunk not found - return nil, nil - } - return nil, err //something else happened or context was cancelled. - } - - var request Request - if err := request.fromChunk(ch); err != nil { - return nil, nil - } - if request.Time <= timeLimit { - return &request, nil - } - return nil, nil - }) - if err != nil { - return nil, err - } - - log.Info(fmt.Sprintf("Feed lookup finished in %d lookups", readCount)) - - request, _ := requestPtr.(*Request) - if request == nil { - return nil, NewError(ErrNotFound, "no feed updates found") - } - return h.updateCache(request) - -} - -// update feed updates cache with specified content -func (h *Handler) updateCache(request *Request) (*cacheEntry, error) { - - updateAddr := request.Addr() - log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level) - - entry := h.get(&request.Feed) - if entry == nil { - entry = &cacheEntry{} - h.set(&request.Feed, entry) - } - - // update our rsrcs entry map - entry.lastKey = updateAddr - entry.Update = request.Update - entry.Reader = bytes.NewReader(entry.data) - return entry, nil -} - -// Update publishes a feed update -// Note that a feed update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature. -// This results in a max payload of `maxUpdateDataLength` (check update.go for more details) -// An error will be returned if the total length of the chunk payload will exceed this limit. -// Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update -// on the network. -func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Address, err error) { - - // we can't update anything without a store - if h.chunkStore == nil { - return nil, NewError(ErrInit, "Call Handler.SetStore() before updating") - } - - feedUpdate := h.get(&r.Feed) - if feedUpdate != nil && feedUpdate.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure - return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist") - } - - ch, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big - if err != nil { - return nil, err - } - - // send the chunk - h.chunkStore.Put(ctx, chunk.ModePutUpload, ch) - log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", ch.Data()) - // update our feed updates map cache entry if the new update is older than the one we have, if we have it. - if feedUpdate != nil && r.Epoch.After(feedUpdate.Epoch) { - feedUpdate.Epoch = r.Epoch - feedUpdate.data = make([]byte, len(r.data)) - feedUpdate.lastKey = r.idAddr - copy(feedUpdate.data, r.data) - feedUpdate.Reader = bytes.NewReader(feedUpdate.data) - } - - return r.idAddr, nil -} - -// Retrieves the feed update cache value for the given nameHash -func (h *Handler) get(feed *Feed) *cacheEntry { - mapKey := feed.mapKey() - h.cacheLock.RLock() - defer h.cacheLock.RUnlock() - feedUpdate := h.cache[mapKey] - return feedUpdate -} - -// Sets the feed update cache value for the given feed -func (h *Handler) set(feed *Feed, feedUpdate *cacheEntry) { - mapKey := feed.mapKey() - h.cacheLock.Lock() - defer h.cacheLock.Unlock() - h.cache[mapKey] = feedUpdate -} |