aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/feeds
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/feeds')
-rw-r--r--swarm/storage/feeds/binaryserializer.go44
-rw-r--r--swarm/storage/feeds/binaryserializer_test.go98
-rw-r--r--swarm/storage/feeds/cacheentry.go48
-rw-r--r--swarm/storage/feeds/doc.go43
-rw-r--r--swarm/storage/feeds/error.go73
-rw-r--r--swarm/storage/feeds/feed.go125
-rw-r--r--swarm/storage/feeds/feed_test.go36
-rw-r--r--swarm/storage/feeds/handler.go295
-rw-r--r--swarm/storage/feeds/handler_test.go520
-rw-r--r--swarm/storage/feeds/id.go123
-rw-r--r--swarm/storage/feeds/id_test.go28
-rw-r--r--swarm/storage/feeds/lookup/epoch.go91
-rw-r--r--swarm/storage/feeds/lookup/epoch_test.go57
-rw-r--r--swarm/storage/feeds/lookup/lookup.go180
-rw-r--r--swarm/storage/feeds/lookup/lookup_test.go414
-rw-r--r--swarm/storage/feeds/query.go78
-rw-r--r--swarm/storage/feeds/query_test.go38
-rw-r--r--swarm/storage/feeds/request.go284
-rw-r--r--swarm/storage/feeds/request_test.go312
-rw-r--r--swarm/storage/feeds/sign.go75
-rw-r--r--swarm/storage/feeds/testutil.go71
-rw-r--r--swarm/storage/feeds/timestampprovider.go84
-rw-r--r--swarm/storage/feeds/topic.go105
-rw-r--r--swarm/storage/feeds/topic_test.go50
-rw-r--r--swarm/storage/feeds/update.go132
-rw-r--r--swarm/storage/feeds/update_test.go50
26 files changed, 3454 insertions, 0 deletions
diff --git a/swarm/storage/feeds/binaryserializer.go b/swarm/storage/feeds/binaryserializer.go
new file mode 100644
index 000000000..ce146571b
--- /dev/null
+++ b/swarm/storage/feeds/binaryserializer.go
@@ -0,0 +1,44 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import "github.com/ethereum/go-ethereum/common/hexutil"
+
+type binarySerializer interface {
+ binaryPut(serializedData []byte) error
+ binaryLength() int
+ binaryGet(serializedData []byte) error
+}
+
+// Values interface represents a string key-value store
+// useful for building query strings
+type Values interface {
+ Get(key string) string
+ Set(key, value string)
+}
+
+type valueSerializer interface {
+ FromValues(values Values) error
+ AppendValues(values Values)
+}
+
+// Hex serializes the structure and converts it to a hex string
+func Hex(bin binarySerializer) string {
+ b := make([]byte, bin.binaryLength())
+ bin.binaryPut(b)
+ return hexutil.Encode(b)
+}
diff --git a/swarm/storage/feeds/binaryserializer_test.go b/swarm/storage/feeds/binaryserializer_test.go
new file mode 100644
index 000000000..0c81e7f18
--- /dev/null
+++ b/swarm/storage/feeds/binaryserializer_test.go
@@ -0,0 +1,98 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "encoding/json"
+ "reflect"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+)
+
+// KV mocks a key value store
+type KV map[string]string
+
+func (kv KV) Get(key string) string {
+ return kv[key]
+}
+func (kv KV) Set(key, value string) {
+ kv[key] = value
+}
+
+func compareByteSliceToExpectedHex(t *testing.T, variableName string, actualValue []byte, expectedHex string) {
+ if hexutil.Encode(actualValue) != expectedHex {
+ t.Fatalf("%s: Expected %s to be %s, got %s", t.Name(), variableName, expectedHex, hexutil.Encode(actualValue))
+ }
+}
+
+func testBinarySerializerRecovery(t *testing.T, bin binarySerializer, expectedHex string) {
+ name := reflect.TypeOf(bin).Elem().Name()
+ serialized := make([]byte, bin.binaryLength())
+ if err := bin.binaryPut(serialized); err != nil {
+ t.Fatalf("%s.binaryPut error when trying to serialize structure: %s", name, err)
+ }
+
+ compareByteSliceToExpectedHex(t, name, serialized, expectedHex)
+
+ recovered := reflect.New(reflect.TypeOf(bin).Elem()).Interface().(binarySerializer)
+ if err := recovered.binaryGet(serialized); err != nil {
+ t.Fatalf("%s.binaryGet error when trying to deserialize structure: %s", name, err)
+ }
+
+ if !reflect.DeepEqual(bin, recovered) {
+ t.Fatalf("Expected that the recovered %s equals the marshalled %s", name, name)
+ }
+
+ serializedWrongLength := make([]byte, 1)
+ copy(serializedWrongLength[:], serialized)
+ if err := recovered.binaryGet(serializedWrongLength); err == nil {
+ t.Fatalf("Expected %s.binaryGet to fail since data is too small", name)
+ }
+}
+
+func testBinarySerializerLengthCheck(t *testing.T, bin binarySerializer) {
+ name := reflect.TypeOf(bin).Elem().Name()
+ // make a slice that is too small to contain the metadata
+ serialized := make([]byte, bin.binaryLength()-1)
+
+ if err := bin.binaryPut(serialized); err == nil {
+ t.Fatalf("Expected %s.binaryPut to fail, since target slice is too small", name)
+ }
+}
+
+func testValueSerializer(t *testing.T, v valueSerializer, expected KV) {
+ name := reflect.TypeOf(v).Elem().Name()
+ kv := make(KV)
+
+ v.AppendValues(kv)
+ if !reflect.DeepEqual(expected, kv) {
+ expj, _ := json.Marshal(expected)
+ gotj, _ := json.Marshal(kv)
+ t.Fatalf("Expected %s.AppendValues to return %s, got %s", name, string(expj), string(gotj))
+ }
+
+ recovered := reflect.New(reflect.TypeOf(v).Elem()).Interface().(valueSerializer)
+ err := recovered.FromValues(kv)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !reflect.DeepEqual(recovered, v) {
+ t.Fatalf("Expected recovered %s to be the same", name)
+ }
+}
diff --git a/swarm/storage/feeds/cacheentry.go b/swarm/storage/feeds/cacheentry.go
new file mode 100644
index 000000000..7a2f87b56
--- /dev/null
+++ b/swarm/storage/feeds/cacheentry.go
@@ -0,0 +1,48 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "bytes"
+ "context"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const (
+ hasherCount = 8
+ feedsHashAlgorithm = storage.SHA3Hash
+ defaultRetrieveTimeout = 100 * time.Millisecond
+)
+
+// cacheEntry caches the last known update of a specific Feed.
+type cacheEntry struct {
+ Update
+ *bytes.Reader
+ lastKey storage.Address
+}
+
+// implements storage.LazySectionReader
+func (r *cacheEntry) Size(ctx context.Context, _ chan bool) (int64, error) {
+ return int64(len(r.Update.data)), nil
+}
+
+//returns the Feed's topic
+func (r *cacheEntry) Topic() Topic {
+ return r.Feed.Topic
+}
diff --git a/swarm/storage/feeds/doc.go b/swarm/storage/feeds/doc.go
new file mode 100644
index 000000000..d1edf5d6d
--- /dev/null
+++ b/swarm/storage/feeds/doc.go
@@ -0,0 +1,43 @@
+/*
+Package feeds defines Swarm Feeds.
+
+Swarm Feeds allows a user to build an update feed about a particular topic
+without resorting to ENS on each update.
+The update scheme is built on swarm chunks with chunk keys following
+a predictable, versionable pattern.
+
+A Feed is tied to a unique identifier that is deterministically generated out of
+the chosen topic.
+
+A Feed is defined as the series of updates of a specific user about a particular topic
+
+Actual data updates are also made in the form of swarm chunks. The keys
+of the updates are the hash of a concatenation of properties as follows:
+
+updateAddr = H(Feed, Epoch ID)
+where H is the SHA3 hash function
+Feed is the combination of Topic and the user address
+Epoch ID is a time slot. See the lookup package for more information.
+
+A user looking up a the latest update in a Feed only needs to know the Topic
+and the other user's address.
+
+The Feed Update data is:
+updatedata = Feed|Epoch|data
+
+The full update data that goes in the chunk payload is:
+updatedata|sign(updatedata)
+
+Structure Summary:
+
+Request: Feed Update with signature
+ Update: headers + data
+ Header: Protocol version and reserved for future use placeholders
+ ID: Information about how to locate a specific update
+ Feed: Represents a user's series of publications about a specific Topic
+ Topic: Item that the updates are about
+ User: User who updates the Feed
+ Epoch: time slot where the update is stored
+
+*/
+package feeds
diff --git a/swarm/storage/feeds/error.go b/swarm/storage/feeds/error.go
new file mode 100644
index 000000000..13266b900
--- /dev/null
+++ b/swarm/storage/feeds/error.go
@@ -0,0 +1,73 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "fmt"
+)
+
+const (
+ ErrInit = iota
+ ErrNotFound
+ ErrIO
+ ErrUnauthorized
+ ErrInvalidValue
+ ErrDataOverflow
+ ErrNothingToReturn
+ ErrCorruptData
+ ErrInvalidSignature
+ ErrNotSynced
+ ErrPeriodDepth
+ ErrCnt
+)
+
+// Error is a the typed error object used for Swarm Feeds
+type Error struct {
+ code int
+ err string
+}
+
+// Error implements the error interface
+func (e *Error) Error() string {
+ return e.err
+}
+
+// Code returns the error code
+// Error codes are enumerated in the error.go file within the feeds package
+func (e *Error) Code() int {
+ return e.code
+}
+
+// NewError creates a new Swarm Feeds Error object with the specified code and custom error message
+func NewError(code int, s string) error {
+ if code < 0 || code >= ErrCnt {
+ panic("no such error code!")
+ }
+ r := &Error{
+ err: s,
+ }
+ switch code {
+ case ErrNotFound, ErrIO, ErrUnauthorized, ErrInvalidValue, ErrDataOverflow, ErrNothingToReturn, ErrInvalidSignature, ErrNotSynced, ErrPeriodDepth, ErrCorruptData:
+ r.code = code
+ }
+ return r
+}
+
+// NewErrorf is a convenience version of NewError that incorporates printf-style formatting
+func NewErrorf(code int, format string, args ...interface{}) error {
+ return NewError(code, fmt.Sprintf(format, args...))
+}
diff --git a/swarm/storage/feeds/feed.go b/swarm/storage/feeds/feed.go
new file mode 100644
index 000000000..8a807d506
--- /dev/null
+++ b/swarm/storage/feeds/feed.go
@@ -0,0 +1,125 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "hash"
+ "unsafe"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// Feed represents a particular user's stream of updates on a Topic
+type Feed struct {
+ Topic Topic `json:"topic"`
+ User common.Address `json:"user"`
+}
+
+// Feed layout:
+// TopicLength bytes
+// userAddr common.AddressLength bytes
+const feedLength = TopicLength + common.AddressLength
+
+// mapKey calculates a unique id for this feed. Used by the cache map in `Handler`
+func (f *Feed) mapKey() uint64 {
+ serializedData := make([]byte, feedLength)
+ f.binaryPut(serializedData)
+ hasher := hashPool.Get().(hash.Hash)
+ defer hashPool.Put(hasher)
+ hasher.Reset()
+ hasher.Write(serializedData)
+ hash := hasher.Sum(nil)
+ return *(*uint64)(unsafe.Pointer(&hash[0]))
+}
+
+// binaryPut serializes this Feed instance into the provided slice
+func (f *Feed) binaryPut(serializedData []byte) error {
+ if len(serializedData) != feedLength {
+ return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize Feed. Expected %d, got %d", feedLength, len(serializedData))
+ }
+ var cursor int
+ copy(serializedData[cursor:cursor+TopicLength], f.Topic[:TopicLength])
+ cursor += TopicLength
+
+ copy(serializedData[cursor:cursor+common.AddressLength], f.User[:])
+ cursor += common.AddressLength
+
+ return nil
+}
+
+// binaryLength returns the expected size of this structure when serialized
+func (f *Feed) binaryLength() int {
+ return feedLength
+}
+
+// binaryGet restores the current instance from the information contained in the passed slice
+func (f *Feed) binaryGet(serializedData []byte) error {
+ if len(serializedData) != feedLength {
+ return NewErrorf(ErrInvalidValue, "Incorrect slice size to read Feed. Expected %d, got %d", feedLength, len(serializedData))
+ }
+
+ var cursor int
+ copy(f.Topic[:], serializedData[cursor:cursor+TopicLength])
+ cursor += TopicLength
+
+ copy(f.User[:], serializedData[cursor:cursor+common.AddressLength])
+ cursor += common.AddressLength
+
+ return nil
+}
+
+// Hex serializes the Feed to a hex string
+func (f *Feed) Hex() string {
+ serializedData := make([]byte, feedLength)
+ f.binaryPut(serializedData)
+ return hexutil.Encode(serializedData)
+}
+
+// FromValues deserializes this instance from a string key-value store
+// useful to parse query strings
+func (f *Feed) FromValues(values Values) (err error) {
+ topic := values.Get("topic")
+ if topic != "" {
+ if err := f.Topic.FromHex(values.Get("topic")); err != nil {
+ return err
+ }
+ } else { // see if the user set name and relatedcontent
+ name := values.Get("name")
+ relatedContent, _ := hexutil.Decode(values.Get("relatedcontent"))
+ if len(relatedContent) > 0 {
+ if len(relatedContent) < storage.AddressLength {
+ return NewErrorf(ErrInvalidValue, "relatedcontent field must be a hex-encoded byte array exactly %d bytes long", storage.AddressLength)
+ }
+ relatedContent = relatedContent[:storage.AddressLength]
+ }
+ f.Topic, err = NewTopic(name, relatedContent)
+ if err != nil {
+ return err
+ }
+ }
+ f.User = common.HexToAddress(values.Get("user"))
+ return nil
+}
+
+// AppendValues serializes this structure into the provided string key-value store
+// useful to build query strings
+func (f *Feed) AppendValues(values Values) {
+ values.Set("topic", f.Topic.Hex())
+ values.Set("user", f.User.Hex())
+}
diff --git a/swarm/storage/feeds/feed_test.go b/swarm/storage/feeds/feed_test.go
new file mode 100644
index 000000000..7806e0ad7
--- /dev/null
+++ b/swarm/storage/feeds/feed_test.go
@@ -0,0 +1,36 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+package feeds
+
+import (
+ "testing"
+)
+
+func getTestFeed() *Feed {
+ topic, _ := NewTopic("world news report, every hour", nil)
+ return &Feed{
+ Topic: topic,
+ User: newCharlieSigner().Address(),
+ }
+}
+
+func TestFeedSerializerDeserializer(t *testing.T) {
+ testBinarySerializerRecovery(t, getTestFeed(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781c")
+}
+
+func TestFeedSerializerLengthCheck(t *testing.T) {
+ testBinarySerializerLengthCheck(t, getTestFeed())
+}
diff --git a/swarm/storage/feeds/handler.go b/swarm/storage/feeds/handler.go
new file mode 100644
index 000000000..9c69fd1b4
--- /dev/null
+++ b/swarm/storage/feeds/handler.go
@@ -0,0 +1,295 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Handler is the API for Feeds
+// It enables creating, updating, syncing and retrieving feed updates and their data
+package feeds
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+type Handler struct {
+ chunkStore *storage.NetStore
+ HashSize int
+ cache map[uint64]*cacheEntry
+ cacheLock sync.RWMutex
+ storeTimeout time.Duration
+ queryMaxPeriods uint32
+}
+
+// HandlerParams pass parameters to the Handler constructor NewHandler
+// Signer and TimestampProvider are mandatory parameters
+type HandlerParams struct {
+}
+
+// hashPool contains a pool of ready hashers
+var hashPool sync.Pool
+
+// init initializes the package and hashPool
+func init() {
+ hashPool = sync.Pool{
+ New: func() interface{} {
+ return storage.MakeHashFunc(feedsHashAlgorithm)()
+ },
+ }
+}
+
+// NewHandler creates a new Swarm Feeds API
+func NewHandler(params *HandlerParams) *Handler {
+ fh := &Handler{
+ cache: make(map[uint64]*cacheEntry),
+ }
+
+ for i := 0; i < hasherCount; i++ {
+ hashfunc := storage.MakeHashFunc(feedsHashAlgorithm)()
+ if fh.HashSize == 0 {
+ fh.HashSize = hashfunc.Size()
+ }
+ hashPool.Put(hashfunc)
+ }
+
+ return fh
+}
+
+// SetStore sets the store backend for the Swarm Feeds API
+func (h *Handler) SetStore(store *storage.NetStore) {
+ h.chunkStore = store
+}
+
+// Validate is a chunk validation method
+// If it looks like a feed update, the chunk address is checked against the userAddr of the update's signature
+// It implements the storage.ChunkValidator interface
+func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
+ dataLength := len(data)
+ if dataLength < minimumSignedUpdateLength {
+ return false
+ }
+
+ // check if it is a properly formatted update chunk with
+ // valid signature and proof of ownership of the feed it is trying
+ // to update
+
+ // First, deserialize the chunk
+ var r Request
+ if err := r.fromChunk(chunkAddr, data); err != nil {
+ log.Debug("Invalid feed update chunk", "addr", chunkAddr.Hex(), "err", err.Error())
+ return false
+ }
+
+ // Verify signatures and that the signer actually owns the feed
+ // If it fails, it means either the signature is not valid, data is corrupted
+ // or someone is trying to update someone else's feed.
+ if err := r.Verify(); err != nil {
+ log.Debug("Invalid feed update signature", "err", err)
+ return false
+ }
+
+ return true
+}
+
+// GetContent retrieves the data payload of the last synced update of the Feed
+func (h *Handler) GetContent(feed *Feed) (storage.Address, []byte, error) {
+ if feed == nil {
+ return nil, nil, NewError(ErrInvalidValue, "feed is nil")
+ }
+ feedUpdate := h.get(feed)
+ if feedUpdate == nil {
+ return nil, nil, NewError(ErrNotFound, "feed update not cached")
+ }
+ return feedUpdate.lastKey, feedUpdate.data, nil
+}
+
+// NewRequest prepares a Request structure with all the necessary information to
+// just add the desired data and sign it.
+// The resulting structure can then be signed and passed to Handler.Update to be verified and sent
+func (h *Handler) NewRequest(ctx context.Context, feed *Feed) (request *Request, err error) {
+ if feed == nil {
+ return nil, NewError(ErrInvalidValue, "feed cannot be nil")
+ }
+
+ now := TimestampProvider.Now().Time
+ request = new(Request)
+ request.Header.Version = ProtocolVersion
+
+ query := NewQueryLatest(feed, lookup.NoClue)
+
+ feedUpdate, err := h.Lookup(ctx, query)
+ if err != nil {
+ if err.(*Error).code != ErrNotFound {
+ return nil, err
+ }
+ // not finding updates means that there is a network error
+ // or that the feed really does not have updates
+ }
+
+ request.Feed = *feed
+
+ // if we already have an update, then find next epoch
+ if feedUpdate != nil {
+ request.Epoch = lookup.GetNextEpoch(feedUpdate.Epoch, now)
+ } else {
+ request.Epoch = lookup.GetFirstEpoch(now)
+ }
+
+ return request, nil
+}
+
+// Lookup retrieves a specific or latest feed update
+// Lookup works differently depending on the configuration of `query`
+// See the `query` documentation and helper functions:
+// `NewQueryLatest` and `NewQuery`
+func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) {
+
+ timeLimit := query.TimeLimit
+ if timeLimit == 0 { // if time limit is set to zero, the user wants to get the latest update
+ timeLimit = TimestampProvider.Now().Time
+ }
+
+ if query.Hint == lookup.NoClue { // try to use our cache
+ entry := h.get(&query.Feed)
+ if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints
+ query.Hint = entry.Epoch
+ }
+ }
+
+ // we can't look for anything without a store
+ if h.chunkStore == nil {
+ return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
+ }
+
+ var id ID
+ id.Feed = query.Feed
+ var readCount int
+
+ // Invoke the lookup engine.
+ // The callback will be called every time the lookup algorithm needs to guess
+ requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) {
+ readCount++
+ id.Epoch = epoch
+ ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
+ defer cancel()
+
+ chunk, err := h.chunkStore.Get(ctx, id.Addr())
+ if err != nil { // TODO: check for catastrophic errors other than chunk not found
+ return nil, nil
+ }
+
+ var request Request
+ if err := request.fromChunk(chunk.Address(), chunk.Data()); err != nil {
+ return nil, nil
+ }
+ if request.Time <= timeLimit {
+ return &request, nil
+ }
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ log.Info(fmt.Sprintf("Feed lookup finished in %d lookups", readCount))
+
+ request, _ := requestPtr.(*Request)
+ if request == nil {
+ return nil, NewError(ErrNotFound, "no feed updates found")
+ }
+ return h.updateCache(request)
+
+}
+
+// update feed updates cache with specified content
+func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
+
+ updateAddr := request.Addr()
+ log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
+
+ feedUpdate := h.get(&request.Feed)
+ if feedUpdate == nil {
+ feedUpdate = &cacheEntry{}
+ h.set(&request.Feed, feedUpdate)
+ }
+
+ // update our rsrcs entry map
+ feedUpdate.lastKey = updateAddr
+ feedUpdate.Update = request.Update
+ feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
+ return feedUpdate, nil
+}
+
+// Update publishes a feed update
+// Note that a Feed update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature.
+// This results in a max payload of `maxUpdateDataLength` (check update.go for more details)
+// An error will be returned if the total length of the chunk payload will exceed this limit.
+// Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update
+// on the network.
+func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Address, err error) {
+
+ // we can't update anything without a store
+ if h.chunkStore == nil {
+ return nil, NewError(ErrInit, "Call Handler.SetStore() before updating")
+ }
+
+ feedUpdate := h.get(&r.Feed)
+ if feedUpdate != nil && feedUpdate.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure
+ return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
+ }
+
+ chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
+ if err != nil {
+ return nil, err
+ }
+
+ // send the chunk
+ h.chunkStore.Put(ctx, chunk)
+ log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
+ // update our feed updates map cache entry if the new update is older than the one we have, if we have it.
+ if feedUpdate != nil && r.Epoch.After(feedUpdate.Epoch) {
+ feedUpdate.Epoch = r.Epoch
+ feedUpdate.data = make([]byte, len(r.data))
+ feedUpdate.lastKey = r.idAddr
+ copy(feedUpdate.data, r.data)
+ feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
+ }
+
+ return r.idAddr, nil
+}
+
+// Retrieves the feed update cache value for the given nameHash
+func (h *Handler) get(feed *Feed) *cacheEntry {
+ mapKey := feed.mapKey()
+ h.cacheLock.RLock()
+ defer h.cacheLock.RUnlock()
+ feedUpdate := h.cache[mapKey]
+ return feedUpdate
+}
+
+// Sets the feed update cache value for the given Feed
+func (h *Handler) set(feed *Feed, feedUpdate *cacheEntry) {
+ mapKey := feed.mapKey()
+ h.cacheLock.Lock()
+ defer h.cacheLock.Unlock()
+ h.cache[mapKey] = feedUpdate
+}
diff --git a/swarm/storage/feeds/handler_test.go b/swarm/storage/feeds/handler_test.go
new file mode 100644
index 000000000..8331980ca
--- /dev/null
+++ b/swarm/storage/feeds/handler_test.go
@@ -0,0 +1,520 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "bytes"
+ "context"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/crypto"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+var (
+ loglevel = flag.Int("loglevel", 3, "loglevel")
+ startTime = Timestamp{
+ Time: uint64(4200),
+ }
+ cleanF func()
+ subtopicName = "føø.bar"
+ hashfunc = storage.MakeHashFunc(storage.DefaultHash)
+)
+
+func init() {
+ flag.Parse()
+ log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true)))))
+}
+
+// simulated timeProvider
+type fakeTimeProvider struct {
+ currentTime uint64
+}
+
+func (f *fakeTimeProvider) Tick() {
+ f.currentTime++
+}
+
+func (f *fakeTimeProvider) Set(time uint64) {
+ f.currentTime = time
+}
+
+func (f *fakeTimeProvider) FastForward(offset uint64) {
+ f.currentTime += offset
+}
+
+func (f *fakeTimeProvider) Now() Timestamp {
+ return Timestamp{
+ Time: f.currentTime,
+ }
+}
+
+// make updates and retrieve them based on periods and versions
+func TestFeedsHandler(t *testing.T) {
+
+ // make fake timeProvider
+ clock := &fakeTimeProvider{
+ currentTime: startTime.Time, // clock starts at t=4200
+ }
+
+ // signer containing private key
+ signer := newAliceSigner()
+
+ feedsHandler, datadir, teardownTest, err := setupTest(clock, signer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer teardownTest()
+
+ // create a new Feed
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ topic, _ := NewTopic("Mess with Swarm Feeds code and see what ghost catches you", nil)
+ feed := Feed{
+ Topic: topic,
+ User: signer.Address(),
+ }
+
+ // data for updates:
+ updates := []string{
+ "blinky", // t=4200
+ "pinky", // t=4242
+ "inky", // t=4284
+ "clyde", // t=4285
+ }
+
+ request := NewFirstRequest(feed.Topic) // this timestamps the update at t = 4200 (start time)
+ chunkAddress := make(map[string]storage.Address)
+ data := []byte(updates[0])
+ request.SetData(data)
+ if err := request.Sign(signer); err != nil {
+ t.Fatal(err)
+ }
+ chunkAddress[updates[0]], err = feedsHandler.Update(ctx, request)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // move the clock ahead 21 seconds
+ clock.FastForward(21) // t=4221
+
+ request, err = feedsHandler.NewRequest(ctx, &request.Feed) // this timestamps the update at t = 4221
+ if err != nil {
+ t.Fatal(err)
+ }
+ if request.Epoch.Base() != 0 || request.Epoch.Level != lookup.HighestLevel-1 {
+ t.Fatalf("Suggested epoch BaseTime should be 0 and Epoch level should be %d", lookup.HighestLevel-1)
+ }
+
+ request.Epoch.Level = lookup.HighestLevel // force level 25 instead of 24 to make it fail
+ data = []byte(updates[1])
+ request.SetData(data)
+ if err := request.Sign(signer); err != nil {
+ t.Fatal(err)
+ }
+ chunkAddress[updates[1]], err = feedsHandler.Update(ctx, request)
+ if err == nil {
+ t.Fatal("Expected update to fail since an update in this epoch already exists")
+ }
+
+ // move the clock ahead 21 seconds
+ clock.FastForward(21) // t=4242
+ request, err = feedsHandler.NewRequest(ctx, &request.Feed)
+ if err != nil {
+ t.Fatal(err)
+ }
+ request.SetData(data)
+ if err := request.Sign(signer); err != nil {
+ t.Fatal(err)
+ }
+ chunkAddress[updates[1]], err = feedsHandler.Update(ctx, request)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // move the clock ahead 42 seconds
+ clock.FastForward(42) // t=4284
+ request, err = feedsHandler.NewRequest(ctx, &request.Feed)
+ if err != nil {
+ t.Fatal(err)
+ }
+ data = []byte(updates[2])
+ request.SetData(data)
+ if err := request.Sign(signer); err != nil {
+ t.Fatal(err)
+ }
+ chunkAddress[updates[2]], err = feedsHandler.Update(ctx, request)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // move the clock ahead 1 second
+ clock.FastForward(1) // t=4285
+ request, err = feedsHandler.NewRequest(ctx, &request.Feed)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if request.Epoch.Base() != 0 || request.Epoch.Level != 22 {
+ t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 22, request.Epoch.Level)
+ }
+ data = []byte(updates[3])
+ request.SetData(data)
+
+ if err := request.Sign(signer); err != nil {
+ t.Fatal(err)
+ }
+ chunkAddress[updates[3]], err = feedsHandler.Update(ctx, request)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ time.Sleep(time.Second)
+ feedsHandler.Close()
+
+ // check we can retrieve the updates after close
+ clock.FastForward(2000) // t=6285
+
+ feedParams := &HandlerParams{}
+
+ feedsHandler2, err := NewTestHandler(datadir, feedParams)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ update2, err := feedsHandler2.Lookup(ctx, NewQueryLatest(&request.Feed, lookup.NoClue))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // last update should be "clyde"
+ if !bytes.Equal(update2.data, []byte(updates[len(updates)-1])) {
+ t.Fatalf("feed update data was %v, expected %v", string(update2.data), updates[len(updates)-1])
+ }
+ if update2.Level != 22 {
+ t.Fatalf("feed update epoch level was %d, expected 22", update2.Level)
+ }
+ if update2.Base() != 0 {
+ t.Fatalf("feed update epoch base time was %d, expected 0", update2.Base())
+ }
+ log.Debug("Latest lookup", "epoch base time", update2.Base(), "epoch level", update2.Level, "data", update2.data)
+
+ // specific point in time
+ update, err := feedsHandler2.Lookup(ctx, NewQuery(&request.Feed, 4284, lookup.NoClue))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // check data
+ if !bytes.Equal(update.data, []byte(updates[2])) {
+ t.Fatalf("feed update data (historical) was %v, expected %v", string(update2.data), updates[2])
+ }
+ log.Debug("Historical lookup", "epoch base time", update2.Base(), "epoch level", update2.Level, "data", update2.data)
+
+ // beyond the first should yield an error
+ update, err = feedsHandler2.Lookup(ctx, NewQuery(&request.Feed, startTime.Time-1, lookup.NoClue))
+ if err == nil {
+ t.Fatalf("expected previous to fail, returned epoch %s data %v", update.Epoch.String(), update.data)
+ }
+
+}
+
+const Day = 60 * 60 * 24
+const Year = Day * 365
+const Month = Day * 30
+
+func generateData(x uint64) []byte {
+ return []byte(fmt.Sprintf("%d", x))
+}
+
+func TestSparseUpdates(t *testing.T) {
+
+ // make fake timeProvider
+ timeProvider := &fakeTimeProvider{
+ currentTime: startTime.Time,
+ }
+
+ // signer containing private key
+ signer := newAliceSigner()
+
+ rh, datadir, teardownTest, err := setupTest(timeProvider, signer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer teardownTest()
+ defer os.RemoveAll(datadir)
+
+ // create a new Feed
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ topic, _ := NewTopic("Very slow updates", nil)
+ feed := Feed{
+ Topic: topic,
+ User: signer.Address(),
+ }
+
+ // publish one update every 5 years since Unix 0 until today
+ today := uint64(1533799046)
+ var epoch lookup.Epoch
+ var lastUpdateTime uint64
+ for T := uint64(0); T < today; T += 5 * Year {
+ request := NewFirstRequest(feed.Topic)
+ request.Epoch = lookup.GetNextEpoch(epoch, T)
+ request.data = generateData(T) // this generates some data that depends on T, so we can check later
+ request.Sign(signer)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := rh.Update(ctx, request); err != nil {
+ t.Fatal(err)
+ }
+ epoch = request.Epoch
+ lastUpdateTime = T
+ }
+
+ query := NewQuery(&feed, today, lookup.NoClue)
+
+ _, err = rh.Lookup(ctx, query)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, content, err := rh.GetContent(&feed)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !bytes.Equal(generateData(lastUpdateTime), content) {
+ t.Fatalf("Expected to recover last written value %d, got %s", lastUpdateTime, string(content))
+ }
+
+ // lookup the closest update to 35*Year + 6* Month (~ June 2005):
+ // it should find the update we put on 35*Year, since we were updating every 5 years.
+
+ query.TimeLimit = 35*Year + 6*Month
+
+ _, err = rh.Lookup(ctx, query)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, content, err = rh.GetContent(&feed)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if !bytes.Equal(generateData(35*Year), content) {
+ t.Fatalf("Expected to recover %d, got %s", 35*Year, string(content))
+ }
+}
+
+func TestValidator(t *testing.T) {
+
+ // make fake timeProvider
+ timeProvider := &fakeTimeProvider{
+ currentTime: startTime.Time,
+ }
+
+ // signer containing private key. Alice will be the good girl
+ signer := newAliceSigner()
+
+ // set up sim timeProvider
+ rh, _, teardownTest, err := setupTest(timeProvider, signer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer teardownTest()
+
+ // create new Feed
+ topic, _ := NewTopic(subtopicName, nil)
+ feed := Feed{
+ Topic: topic,
+ User: signer.Address(),
+ }
+ mr := NewFirstRequest(feed.Topic)
+
+ // chunk with address
+ data := []byte("foo")
+ mr.SetData(data)
+ if err := mr.Sign(signer); err != nil {
+ t.Fatalf("sign fail: %v", err)
+ }
+
+ chunk, err := mr.toChunk()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !rh.Validate(chunk.Address(), chunk.Data()) {
+ t.Fatal("Chunk validator fail on update chunk")
+ }
+
+ address := chunk.Address()
+ // mess with the address
+ address[0] = 11
+ address[15] = 99
+
+ if rh.Validate(address, chunk.Data()) {
+ t.Fatal("Expected Validate to fail with false chunk address")
+ }
+}
+
+// tests that the content address validator correctly checks the data
+// tests that Feed update chunks are passed through content address validator
+// there is some redundancy in this test as it also tests content addressed chunks,
+// which should be evaluated as invalid chunks by this validator
+func TestValidatorInStore(t *testing.T) {
+
+ // make fake timeProvider
+ TimestampProvider = &fakeTimeProvider{
+ currentTime: startTime.Time,
+ }
+
+ // signer containing private key
+ signer := newAliceSigner()
+
+ // set up localstore
+ datadir, err := ioutil.TempDir("", "storage-testfeedsvalidator")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(datadir)
+
+ handlerParams := storage.NewDefaultLocalStoreParams()
+ handlerParams.Init(datadir)
+ store, err := storage.NewLocalStore(handlerParams, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // set up Swarm Feeds handler and add is as a validator to the localstore
+ fhParams := &HandlerParams{}
+ fh := NewHandler(fhParams)
+ store.Validators = append(store.Validators, fh)
+
+ // create content addressed chunks, one good, one faulty
+ chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2)
+ goodChunk := chunks[0]
+ badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data())
+
+ topic, _ := NewTopic("xyzzy", nil)
+ feed := Feed{
+ Topic: topic,
+ User: signer.Address(),
+ }
+
+ // create a feed update chunk with correct publickey
+ id := ID{
+ Epoch: lookup.Epoch{Time: 42,
+ Level: 1,
+ },
+ Feed: feed,
+ }
+
+ updateAddr := id.Addr()
+ data := []byte("bar")
+
+ r := new(Request)
+ r.idAddr = updateAddr
+ r.Update.ID = id
+ r.data = data
+
+ r.Sign(signer)
+
+ uglyChunk, err := r.toChunk()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // put the chunks in the store and check their error status
+ err = store.Put(context.Background(), goodChunk)
+ if err == nil {
+ t.Fatal("expected error on good content address chunk with feed update validator only, but got nil")
+ }
+ err = store.Put(context.Background(), badChunk)
+ if err == nil {
+ t.Fatal("expected error on bad content address chunk with feed update validator only, but got nil")
+ }
+ err = store.Put(context.Background(), uglyChunk)
+ if err != nil {
+ t.Fatalf("expected no error on feed update chunk with feed update validator only, but got: %s", err)
+ }
+}
+
+// create rpc and Feeds Handler
+func setupTest(timeProvider timestampProvider, signer Signer) (fh *TestHandler, datadir string, teardown func(), err error) {
+
+ var fsClean func()
+ var rpcClean func()
+ cleanF = func() {
+ if fsClean != nil {
+ fsClean()
+ }
+ if rpcClean != nil {
+ rpcClean()
+ }
+ }
+
+ // temp datadir
+ datadir, err = ioutil.TempDir("", "fh")
+ if err != nil {
+ return nil, "", nil, err
+ }
+ fsClean = func() {
+ os.RemoveAll(datadir)
+ }
+
+ TimestampProvider = timeProvider
+ fhParams := &HandlerParams{}
+ fh, err = NewTestHandler(datadir, fhParams)
+ return fh, datadir, cleanF, err
+}
+
+func newAliceSigner() *GenericSigner {
+ privKey, _ := crypto.HexToECDSA("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef")
+ return NewGenericSigner(privKey)
+}
+
+func newBobSigner() *GenericSigner {
+ privKey, _ := crypto.HexToECDSA("accedeaccedeaccedeaccedeaccedeaccedeaccedeaccedeaccedeaccedecaca")
+ return NewGenericSigner(privKey)
+}
+
+func newCharlieSigner() *GenericSigner {
+ privKey, _ := crypto.HexToECDSA("facadefacadefacadefacadefacadefacadefacadefacadefacadefacadefaca")
+ return NewGenericSigner(privKey)
+}
+
+func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) {
+ chunk, err := rh.chunkStore.Get(context.TODO(), addr)
+ if err != nil {
+ return nil, err
+ }
+ var r Request
+ if err := r.fromChunk(addr, chunk.Data()); err != nil {
+ return nil, err
+ }
+ return r.data, nil
+}
diff --git a/swarm/storage/feeds/id.go b/swarm/storage/feeds/id.go
new file mode 100644
index 000000000..dd813ae89
--- /dev/null
+++ b/swarm/storage/feeds/id.go
@@ -0,0 +1,123 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "fmt"
+ "hash"
+ "strconv"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// ID uniquely identifies an update on the network.
+type ID struct {
+ Feed `json:"feed"`
+ lookup.Epoch `json:"epoch"`
+}
+
+// ID layout:
+// Feed feedLength bytes
+// Epoch EpochLength
+const idLength = feedLength + lookup.EpochLength
+
+// Addr calculates the feed update chunk address corresponding to this ID
+func (u *ID) Addr() (updateAddr storage.Address) {
+ serializedData := make([]byte, idLength)
+ var cursor int
+ u.Feed.binaryPut(serializedData[cursor : cursor+feedLength])
+ cursor += feedLength
+
+ eid := u.Epoch.ID()
+ copy(serializedData[cursor:cursor+lookup.EpochLength], eid[:])
+
+ hasher := hashPool.Get().(hash.Hash)
+ defer hashPool.Put(hasher)
+ hasher.Reset()
+ hasher.Write(serializedData)
+ return hasher.Sum(nil)
+}
+
+// binaryPut serializes this instance into the provided slice
+func (u *ID) binaryPut(serializedData []byte) error {
+ if len(serializedData) != idLength {
+ return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize ID. Expected %d, got %d", idLength, len(serializedData))
+ }
+ var cursor int
+ if err := u.Feed.binaryPut(serializedData[cursor : cursor+feedLength]); err != nil {
+ return err
+ }
+ cursor += feedLength
+
+ epochBytes, err := u.Epoch.MarshalBinary()
+ if err != nil {
+ return err
+ }
+ copy(serializedData[cursor:cursor+lookup.EpochLength], epochBytes[:])
+ cursor += lookup.EpochLength
+
+ return nil
+}
+
+// binaryLength returns the expected size of this structure when serialized
+func (u *ID) binaryLength() int {
+ return idLength
+}
+
+// binaryGet restores the current instance from the information contained in the passed slice
+func (u *ID) binaryGet(serializedData []byte) error {
+ if len(serializedData) != idLength {
+ return NewErrorf(ErrInvalidValue, "Incorrect slice size to read ID. Expected %d, got %d", idLength, len(serializedData))
+ }
+
+ var cursor int
+ if err := u.Feed.binaryGet(serializedData[cursor : cursor+feedLength]); err != nil {
+ return err
+ }
+ cursor += feedLength
+
+ if err := u.Epoch.UnmarshalBinary(serializedData[cursor : cursor+lookup.EpochLength]); err != nil {
+ return err
+ }
+ cursor += lookup.EpochLength
+
+ return nil
+}
+
+// FromValues deserializes this instance from a string key-value store
+// useful to parse query strings
+func (u *ID) FromValues(values Values) error {
+ level, _ := strconv.ParseUint(values.Get("level"), 10, 32)
+ u.Epoch.Level = uint8(level)
+ u.Epoch.Time, _ = strconv.ParseUint(values.Get("time"), 10, 64)
+
+ if u.Feed.User == (common.Address{}) {
+ return u.Feed.FromValues(values)
+ }
+ return nil
+}
+
+// AppendValues serializes this structure into the provided string key-value store
+// useful to build query strings
+func (u *ID) AppendValues(values Values) {
+ values.Set("level", fmt.Sprintf("%d", u.Epoch.Level))
+ values.Set("time", fmt.Sprintf("%d", u.Epoch.Time))
+ u.Feed.AppendValues(values)
+}
diff --git a/swarm/storage/feeds/id_test.go b/swarm/storage/feeds/id_test.go
new file mode 100644
index 000000000..2ef12e891
--- /dev/null
+++ b/swarm/storage/feeds/id_test.go
@@ -0,0 +1,28 @@
+package feeds
+
+import (
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+func getTestID() *ID {
+ return &ID{
+ Feed: *getTestFeed(),
+ Epoch: lookup.GetFirstEpoch(1000),
+ }
+}
+
+func TestIDAddr(t *testing.T) {
+ id := getTestID()
+ updateAddr := id.Addr()
+ compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783")
+}
+
+func TestIDSerializer(t *testing.T) {
+ testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019")
+}
+
+func TestIDLengthCheck(t *testing.T) {
+ testBinarySerializerLengthCheck(t, getTestID())
+}
diff --git a/swarm/storage/feeds/lookup/epoch.go b/swarm/storage/feeds/lookup/epoch.go
new file mode 100644
index 000000000..bafe95477
--- /dev/null
+++ b/swarm/storage/feeds/lookup/epoch.go
@@ -0,0 +1,91 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package lookup
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+)
+
+// Epoch represents a time slot at a particular frequency level
+type Epoch struct {
+ Time uint64 `json:"time"` // Time stores the time at which the update or lookup takes place
+ Level uint8 `json:"level"` // Level indicates the frequency level as the exponent of a power of 2
+}
+
+// EpochID is a unique identifier for an Epoch, based on its level and base time.
+type EpochID [8]byte
+
+// EpochLength stores the serialized binary length of an Epoch
+const EpochLength = 8
+
+// MaxTime contains the highest possible time value an Epoch can handle
+const MaxTime uint64 = (1 << 56) - 1
+
+// Base returns the base time of the Epoch
+func (e *Epoch) Base() uint64 {
+ return getBaseTime(e.Time, e.Level)
+}
+
+// ID Returns the unique identifier of this epoch
+func (e *Epoch) ID() EpochID {
+ base := e.Base()
+ var id EpochID
+ binary.LittleEndian.PutUint64(id[:], base)
+ id[7] = e.Level
+ return id
+}
+
+// MarshalBinary implements the encoding.BinaryMarshaller interface
+func (e *Epoch) MarshalBinary() (data []byte, err error) {
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b[:], e.Time)
+ b[7] = e.Level
+ return b, nil
+}
+
+// UnmarshalBinary implements the encoding.BinaryUnmarshaller interface
+func (e *Epoch) UnmarshalBinary(data []byte) error {
+ if len(data) != EpochLength {
+ return errors.New("Invalid data unmarshalling Epoch")
+ }
+ b := make([]byte, 8)
+ copy(b, data)
+ e.Level = b[7]
+ b[7] = 0
+ e.Time = binary.LittleEndian.Uint64(b)
+ return nil
+}
+
+// After returns true if this epoch occurs later or exactly at the other epoch.
+func (e *Epoch) After(epoch Epoch) bool {
+ if e.Time == epoch.Time {
+ return e.Level < epoch.Level
+ }
+ return e.Time >= epoch.Time
+}
+
+// Equals compares two epochs and returns true if they refer to the same time period.
+func (e *Epoch) Equals(epoch Epoch) bool {
+ return e.Level == epoch.Level && e.Base() == epoch.Base()
+}
+
+// String implements the Stringer interface.
+func (e *Epoch) String() string {
+ return fmt.Sprintf("Epoch{Time:%d, Level:%d}", e.Time, e.Level)
+}
diff --git a/swarm/storage/feeds/lookup/epoch_test.go b/swarm/storage/feeds/lookup/epoch_test.go
new file mode 100644
index 000000000..70bfd836a
--- /dev/null
+++ b/swarm/storage/feeds/lookup/epoch_test.go
@@ -0,0 +1,57 @@
+package lookup_test
+
+import (
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+func TestMarshallers(t *testing.T) {
+
+ for i := uint64(1); i < lookup.MaxTime; i *= 3 {
+ e := lookup.Epoch{
+ Time: i,
+ Level: uint8(i % 20),
+ }
+ b, err := e.MarshalBinary()
+ if err != nil {
+ t.Fatal(err)
+ }
+ var e2 lookup.Epoch
+ if err := e2.UnmarshalBinary(b); err != nil {
+ t.Fatal(err)
+ }
+ if e != e2 {
+ t.Fatal("Expected unmarshalled epoch to be equal to marshalled onet.Fatal(err)")
+ }
+ }
+
+}
+
+func TestAfter(t *testing.T) {
+ a := lookup.Epoch{
+ Time: 5,
+ Level: 3,
+ }
+ b := lookup.Epoch{
+ Time: 6,
+ Level: 3,
+ }
+ c := lookup.Epoch{
+ Time: 6,
+ Level: 4,
+ }
+
+ if !b.After(a) {
+ t.Fatal("Expected 'after' to be true, got false")
+ }
+
+ if b.After(b) {
+ t.Fatal("Expected 'after' to be false when both epochs are identical, got true")
+ }
+
+ if !b.After(c) {
+ t.Fatal("Expected 'after' to be true when both epochs have the same time but the level is lower in the first one, but got false")
+ }
+
+}
diff --git a/swarm/storage/feeds/lookup/lookup.go b/swarm/storage/feeds/lookup/lookup.go
new file mode 100644
index 000000000..a5154d261
--- /dev/null
+++ b/swarm/storage/feeds/lookup/lookup.go
@@ -0,0 +1,180 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+Package lookup defines Feed lookup algorithms and provides tools to place updates
+so they can be found
+*/
+package lookup
+
+const maxuint64 = ^uint64(0)
+
+// LowestLevel establishes the frequency resolution of the lookup algorithm as a power of 2.
+const LowestLevel uint8 = 0 // default is 0 (1 second)
+
+// HighestLevel sets the lowest frequency the algorithm will operate at, as a power of 2.
+// 25 -> 2^25 equals to roughly one year.
+const HighestLevel = 25 // default is 25 (~1 year)
+
+// DefaultLevel sets what level will be chosen to search when there is no hint
+const DefaultLevel = HighestLevel
+
+//Algorithm is the function signature of a lookup algorithm
+type Algorithm func(now uint64, hint Epoch, read ReadFunc) (value interface{}, err error)
+
+// Lookup finds the update with the highest timestamp that is smaller or equal than 'now'
+// It takes a hint which should be the epoch where the last known update was
+// If you don't know in what epoch the last update happened, simply submit lookup.NoClue
+// read() will be called on each lookup attempt
+// Returns an error only if read() returns an error
+// Returns nil if an update was not found
+var Lookup Algorithm = FluzCapacitorAlgorithm
+
+// ReadFunc is a handler called by Lookup each time it attempts to find a value
+// It should return <nil> if a value is not found
+// It should return <nil> if a value is found, but its timestamp is higher than "now"
+// It should only return an error in case the handler wants to stop the
+// lookup process entirely.
+type ReadFunc func(epoch Epoch, now uint64) (interface{}, error)
+
+// NoClue is a hint that can be provided when the Lookup caller does not have
+// a clue about where the last update may be
+var NoClue = Epoch{}
+
+// getBaseTime returns the epoch base time of the given
+// time and level
+func getBaseTime(t uint64, level uint8) uint64 {
+ return t & (maxuint64 << level)
+}
+
+// Hint creates a hint based only on the last known update time
+func Hint(last uint64) Epoch {
+ return Epoch{
+ Time: last,
+ Level: DefaultLevel,
+ }
+}
+
+// GetNextLevel returns the frequency level a next update should be placed at, provided where
+// the last update was and what time it is now.
+// This is the first nonzero bit of the XOR of 'last' and 'now', counting from the highest significant bit
+// but limited to not return a level that is smaller than the last-1
+func GetNextLevel(last Epoch, now uint64) uint8 {
+ // First XOR the last epoch base time with the current clock.
+ // This will set all the common most significant bits to zero.
+ mix := (last.Base() ^ now)
+
+ // Then, make sure we stop the below loop before one level below the current, by setting
+ // that level's bit to 1.
+ // If the next level is lower than the current one, it must be exactly level-1 and not lower.
+ mix |= (1 << (last.Level - 1))
+
+ // if the last update was more than 2^highestLevel seconds ago, choose the highest level
+ if mix > (maxuint64 >> (64 - HighestLevel - 1)) {
+ return HighestLevel
+ }
+
+ // set up a mask to scan for nonzero bits, starting at the highest level
+ mask := uint64(1 << (HighestLevel))
+
+ for i := uint8(HighestLevel); i > LowestLevel; i-- {
+ if mix&mask != 0 { // if we find a nonzero bit, this is the level the next update should be at.
+ return i
+ }
+ mask = mask >> 1 // move our bit one position to the right
+ }
+ return 0
+}
+
+// GetNextEpoch returns the epoch where the next update should be located
+// according to where the previous update was
+// and what time it is now.
+func GetNextEpoch(last Epoch, now uint64) Epoch {
+ if last == NoClue {
+ return GetFirstEpoch(now)
+ }
+ level := GetNextLevel(last, now)
+ return Epoch{
+ Level: level,
+ Time: now,
+ }
+}
+
+// GetFirstEpoch returns the epoch where the first update should be located
+// based on what time it is now.
+func GetFirstEpoch(now uint64) Epoch {
+ return Epoch{Level: HighestLevel, Time: now}
+}
+
+var worstHint = Epoch{Time: 0, Level: 63}
+
+// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found
+// going back and forth in time
+// First, it will attempt to find an update where it should be now if the hint was
+// really the last update. If that lookup fails, then the last update must be either the hint itself
+// or the epochs right below. If however, that lookup succeeds, then the update must be
+// that one or within the epochs right below.
+// see the guide for a more graphical representation
+func FluzCapacitorAlgorithm(now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) {
+ var lastFound interface{}
+ var epoch Epoch
+ if hint == NoClue {
+ hint = worstHint
+ }
+
+ t := now
+
+ for {
+ epoch = GetNextEpoch(hint, t)
+ value, err = read(epoch, now)
+ if err != nil {
+ return nil, err
+ }
+ if value != nil {
+ lastFound = value
+ if epoch.Level == LowestLevel || epoch.Equals(hint) {
+ return value, nil
+ }
+ hint = epoch
+ continue
+ }
+ if epoch.Base() == hint.Base() {
+ if lastFound != nil {
+ return lastFound, nil
+ }
+ // we have reached the hint itself
+ if hint == worstHint {
+ return nil, nil
+ }
+ // check it out
+ value, err = read(hint, now)
+ if err != nil {
+ return nil, err
+ }
+ if value != nil {
+ return value, nil
+ }
+ // bad hint.
+ epoch = hint
+ hint = worstHint
+ }
+ base := epoch.Base()
+ if base == 0 {
+ return nil, nil
+ }
+ t = base - 1
+ }
+}
diff --git a/swarm/storage/feeds/lookup/lookup_test.go b/swarm/storage/feeds/lookup/lookup_test.go
new file mode 100644
index 000000000..7d5014608
--- /dev/null
+++ b/swarm/storage/feeds/lookup/lookup_test.go
@@ -0,0 +1,414 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package lookup_test
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+type Data struct {
+ Payload uint64
+ Time uint64
+}
+
+type Store map[lookup.EpochID]*Data
+
+func write(store Store, epoch lookup.Epoch, value *Data) {
+ log.Debug("Write: %d-%d, value='%d'\n", epoch.Base(), epoch.Level, value.Payload)
+ store[epoch.ID()] = value
+}
+
+func update(store Store, last lookup.Epoch, now uint64, value *Data) lookup.Epoch {
+ epoch := lookup.GetNextEpoch(last, now)
+
+ write(store, epoch, value)
+
+ return epoch
+}
+
+const Day = 60 * 60 * 24
+const Year = Day * 365
+const Month = Day * 30
+
+func makeReadFunc(store Store, counter *int) lookup.ReadFunc {
+ return func(epoch lookup.Epoch, now uint64) (interface{}, error) {
+ *counter++
+ data := store[epoch.ID()]
+ var valueStr string
+ if data != nil {
+ valueStr = fmt.Sprintf("%d", data.Payload)
+ }
+ log.Debug("Read: %d-%d, value='%s'\n", epoch.Base(), epoch.Level, valueStr)
+ if data != nil && data.Time <= now {
+ return data, nil
+ }
+ return nil, nil
+ }
+}
+
+func TestLookup(t *testing.T) {
+
+ store := make(Store)
+ readCount := 0
+ readFunc := makeReadFunc(store, &readCount)
+
+ // write an update every month for 12 months 3 years ago and then silence for two years
+ now := uint64(1533799046)
+ var epoch lookup.Epoch
+
+ var lastData *Data
+ for i := uint64(0); i < 12; i++ {
+ t := uint64(now - Year*3 + i*Month)
+ data := Data{
+ Payload: t, //our "payload" will be the timestamp itself.
+ Time: t,
+ }
+ epoch = update(store, epoch, t, &data)
+ lastData = &data
+ }
+
+ // try to get the last value
+
+ value, err := lookup.Lookup(now, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ readCountWithoutHint := readCount
+
+ if value != lastData {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value)
+ }
+
+ // reset the read count for the next test
+ readCount = 0
+ // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update
+ value, err = lookup.Lookup(now, epoch, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if value != lastData {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value)
+ }
+
+ if readCount > readCountWithoutHint {
+ t.Fatalf("Expected lookup to complete with fewer or same reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount)
+ }
+
+ // try to get an intermediate value
+ // if we look for a value in now - Year*3 + 6*Month, we should get that value
+ // Since the "payload" is the timestamp itself, we can check this.
+
+ expectedTime := now - Year*3 + 6*Month
+
+ value, err = lookup.Lookup(expectedTime, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ data, ok := value.(*Data)
+
+ if !ok {
+ t.Fatal("Expected value to contain data")
+ }
+
+ if data.Time != expectedTime {
+ t.Fatalf("Expected value timestamp to be %d, got %d", data.Time, expectedTime)
+ }
+
+}
+
+func TestOneUpdateAt0(t *testing.T) {
+
+ store := make(Store)
+ readCount := 0
+
+ readFunc := makeReadFunc(store, &readCount)
+ now := uint64(1533903729)
+
+ var epoch lookup.Epoch
+ data := Data{
+ Payload: 79,
+ Time: 0,
+ }
+ update(store, epoch, 0, &data)
+
+ value, err := lookup.Lookup(now, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if value != &data {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value)
+ }
+}
+
+// Tests the update is found even when a bad hint is given
+func TestBadHint(t *testing.T) {
+
+ store := make(Store)
+ readCount := 0
+
+ readFunc := makeReadFunc(store, &readCount)
+ now := uint64(1533903729)
+
+ var epoch lookup.Epoch
+ data := Data{
+ Payload: 79,
+ Time: 0,
+ }
+
+ // place an update for t=1200
+ update(store, epoch, 1200, &data)
+
+ // come up with some evil hint
+ badHint := lookup.Epoch{
+ Level: 18,
+ Time: 1200000000,
+ }
+
+ value, err := lookup.Lookup(now, badHint, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if value != &data {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value)
+ }
+}
+
+func TestLookupFail(t *testing.T) {
+
+ store := make(Store)
+ readCount := 0
+
+ readFunc := makeReadFunc(store, &readCount)
+ now := uint64(1533903729)
+
+ // don't write anything and try to look up.
+ // we're testing we don't get stuck in a loop
+
+ value, err := lookup.Lookup(now, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if value != nil {
+ t.Fatal("Expected value to be nil, since the update should've failed")
+ }
+
+ expectedReads := now/(1<<lookup.HighestLevel) + 1
+ if uint64(readCount) != expectedReads {
+ t.Fatalf("Expected lookup to fail after %d reads. Did %d reads.", expectedReads, readCount)
+ }
+}
+
+func TestHighFreqUpdates(t *testing.T) {
+
+ store := make(Store)
+ readCount := 0
+
+ readFunc := makeReadFunc(store, &readCount)
+ now := uint64(1533903729)
+
+ // write an update every second for the last 1000 seconds
+ var epoch lookup.Epoch
+
+ var lastData *Data
+ for i := uint64(0); i <= 994; i++ {
+ T := uint64(now - 1000 + i)
+ data := Data{
+ Payload: T, //our "payload" will be the timestamp itself.
+ Time: T,
+ }
+ epoch = update(store, epoch, T, &data)
+ lastData = &data
+ }
+
+ value, err := lookup.Lookup(lastData.Time, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if value != lastData {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value)
+ }
+
+ readCountWithoutHint := readCount
+ // reset the read count for the next test
+ readCount = 0
+ // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update
+ value, err = lookup.Lookup(now, epoch, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if value != lastData {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value)
+ }
+
+ if readCount > readCountWithoutHint {
+ t.Fatalf("Expected lookup to complete with fewer or equal reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount)
+ }
+
+ for i := uint64(0); i <= 994; i++ {
+ T := uint64(now - 1000 + i) // update every second for the last 1000 seconds
+ value, err := lookup.Lookup(T, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, _ := value.(*Data)
+ if data == nil {
+ t.Fatalf("Expected lookup to return %d, got nil", T)
+ }
+ if data.Payload != T {
+ t.Fatalf("Expected lookup to return %d, got %d", T, data.Time)
+ }
+ }
+}
+
+func TestSparseUpdates(t *testing.T) {
+
+ store := make(Store)
+ readCount := 0
+ readFunc := makeReadFunc(store, &readCount)
+
+ // write an update every 5 years 3 times starting in Jan 1st 1970 and then silence
+
+ now := uint64(1533799046)
+ var epoch lookup.Epoch
+
+ var lastData *Data
+ for i := uint64(0); i < 5; i++ {
+ T := uint64(Year * 5 * i) // write an update every 5 years 3 times starting in Jan 1st 1970 and then silence
+ data := Data{
+ Payload: T, //our "payload" will be the timestamp itself.
+ Time: T,
+ }
+ epoch = update(store, epoch, T, &data)
+ lastData = &data
+ }
+
+ // try to get the last value
+
+ value, err := lookup.Lookup(now, lookup.NoClue, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ readCountWithoutHint := readCount
+
+ if value != lastData {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value)
+ }
+
+ // reset the read count for the next test
+ readCount = 0
+ // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update
+ value, err = lookup.Lookup(now, epoch, readFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if value != lastData {
+ t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value)
+ }
+
+ if readCount > readCountWithoutHint {
+ t.Fatalf("Expected lookup to complete with fewer reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount)
+ }
+
+}
+
+// testG will hold precooked test results
+// fields are abbreviated to reduce the size of the literal below
+type testG struct {
+ e lookup.Epoch // last
+ n uint64 // next level
+ x uint8 // expected result
+}
+
+// test cases
+var testGetNextLevelCases []testG = []testG{{e: lookup.Epoch{Time: 989875233, Level: 12}, n: 989875233, x: 11}, {e: lookup.Epoch{Time: 995807650, Level: 18}, n: 995598156, x: 19}, {e: lookup.Epoch{Time: 969167082, Level: 0}, n: 968990357, x: 18}, {e: lookup.Epoch{Time: 993087628, Level: 14}, n: 992987044, x: 20}, {e: lookup.Epoch{Time: 963364631, Level: 20}, n: 963364630, x: 19}, {e: lookup.Epoch{Time: 963497510, Level: 16}, n: 963370732, x: 18}, {e: lookup.Epoch{Time: 955421349, Level: 22}, n: 955421348, x: 21}, {e: lookup.Epoch{Time: 968220379, Level: 15}, n: 968220378, x: 14}, {e: lookup.Epoch{Time: 939129014, Level: 6}, n: 939128771, x: 11}, {e: lookup.Epoch{Time: 907847903, Level: 6}, n: 907791833, x: 18}, {e: lookup.Epoch{Time: 910835564, Level: 15}, n: 910835564, x: 14}, {e: lookup.Epoch{Time: 913578333, Level: 22}, n: 881808431, x: 25}, {e: lookup.Epoch{Time: 895818460, Level: 3}, n: 895818132, x: 9}, {e: lookup.Epoch{Time: 903843025, Level: 24}, n: 895609561, x: 23}, {e: lookup.Epoch{Time: 877889433, Level: 13}, n: 877877093, x: 15}, {e: lookup.Epoch{Time: 901450396, Level: 10}, n: 901450058, x: 9}, {e: lookup.Epoch{Time: 925179910, Level: 3}, n: 925168393, x: 16}, {e: lookup.Epoch{Time: 913485477, Level: 21}, n: 913485476, x: 20}, {e: lookup.Epoch{Time: 924462991, Level: 18}, n: 924462990, x: 17}, {e: lookup.Epoch{Time: 941175128, Level: 13}, n: 941175127, x: 12}, {e: lookup.Epoch{Time: 920126583, Level: 3}, n: 920100782, x: 19}, {e: lookup.Epoch{Time: 932403200, Level: 9}, n: 932279891, x: 17}, {e: lookup.Epoch{Time: 948284931, Level: 2}, n: 948284921, x: 9}, {e: lookup.Epoch{Time: 953540997, Level: 7}, n: 950547986, x: 22}, {e: lookup.Epoch{Time: 926639837, Level: 18}, n: 918608882, x: 24}, {e: lookup.Epoch{Time: 954637598, Level: 1}, n: 954578761, x: 17}, {e: lookup.Epoch{Time: 943482981, Level: 10}, n: 942924151, x: 19}, {e: lookup.Epoch{Time: 963580771, Level: 7}, n: 963580771, x: 6}, {e: lookup.Epoch{Time: 993744930, Level: 7}, n: 993690858, x: 16}, {e: lookup.Epoch{Time: 1018890213, Level: 12}, n: 1018890212, x: 11}, {e: lookup.Epoch{Time: 1030309411, Level: 2}, n: 1030309227, x: 9}, {e: lookup.Epoch{Time: 1063204997, Level: 20}, n: 1063204996, x: 19}, {e: lookup.Epoch{Time: 1094340832, Level: 6}, n: 1094340633, x: 7}, {e: lookup.Epoch{Time: 1077880597, Level: 10}, n: 1075914292, x: 20}, {e: lookup.Epoch{Time: 1051114957, Level: 18}, n: 1051114957, x: 17}, {e: lookup.Epoch{Time: 1045649701, Level: 22}, n: 1045649700, x: 21}, {e: lookup.Epoch{Time: 1066198885, Level: 14}, n: 1066198884, x: 13}, {e: lookup.Epoch{Time: 1053231952, Level: 1}, n: 1053210845, x: 16}, {e: lookup.Epoch{Time: 1068763404, Level: 14}, n: 1068675428, x: 18}, {e: lookup.Epoch{Time: 1039042173, Level: 15}, n: 1038973110, x: 17}, {e: lookup.Epoch{Time: 1050747636, Level: 6}, n: 1050747364, x: 9}, {e: lookup.Epoch{Time: 1030034434, Level: 23}, n: 1030034433, x: 22}, {e: lookup.Epoch{Time: 1003783425, Level: 18}, n: 1003783424, x: 17}, {e: lookup.Epoch{Time: 988163976, Level: 15}, n: 988084064, x: 17}, {e: lookup.Epoch{Time: 1007222377, Level: 15}, n: 1007222377, x: 14}, {e: lookup.Epoch{Time: 1001211375, Level: 13}, n: 1001208178, x: 14}, {e: lookup.Epoch{Time: 997623199, Level: 8}, n: 997623198, x: 7}, {e: lookup.Epoch{Time: 1026283830, Level: 10}, n: 1006681704, x: 24}, {e: lookup.Epoch{Time: 1019421907, Level: 20}, n: 1019421906, x: 19}, {e: lookup.Epoch{Time: 1043154306, Level: 16}, n: 1043108343, x: 16}, {e: lookup.Epoch{Time: 1075643767, Level: 17}, n: 1075325898, x: 18}, {e: lookup.Epoch{Time: 1043726309, Level: 20}, n: 1043726308, x: 19}, {e: lookup.Epoch{Time: 1056415324, Level: 17}, n: 1056415324, x: 16}, {e: lookup.Epoch{Time: 1088650219, Level: 13}, n: 1088650218, x: 12}, {e: lookup.Epoch{Time: 1088551662, Level: 7}, n: 1088543355, x: 13}, {e: lookup.Epoch{Time: 1069667265, Level: 6}, n: 1069667075, x: 7}, {e: lookup.Epoch{Time: 1079145970, Level: 18}, n: 1079145969, x: 17}, {e: lookup.Epoch{Time: 1083338876, Level: 7}, n: 1083338875, x: 6}, {e: lookup.Epoch{Time: 1051581086, Level: 4}, n: 1051568869, x: 14}, {e: lookup.Epoch{Time: 1028430882, Level: 4}, n: 1028430864, x: 5}, {e: lookup.Epoch{Time: 1057356462, Level: 1}, n: 1057356417, x: 5}, {e: lookup.Epoch{Time: 1033104266, Level: 0}, n: 1033097479, x: 13}, {e: lookup.Epoch{Time: 1031391367, Level: 11}, n: 1031387304, x: 14}, {e: lookup.Epoch{Time: 1049781164, Level: 15}, n: 1049781163, x: 14}, {e: lookup.Epoch{Time: 1027271628, Level: 12}, n: 1027271627, x: 11}, {e: lookup.Epoch{Time: 1057270560, Level: 23}, n: 1057270560, x: 22}, {e: lookup.Epoch{Time: 1047501317, Level: 15}, n: 1047501317, x: 14}, {e: lookup.Epoch{Time: 1058349035, Level: 11}, n: 1045175573, x: 24}, {e: lookup.Epoch{Time: 1057396147, Level: 20}, n: 1057396147, x: 19}, {e: lookup.Epoch{Time: 1048906375, Level: 18}, n: 1039616919, x: 25}, {e: lookup.Epoch{Time: 1074294831, Level: 20}, n: 1074294831, x: 19}, {e: lookup.Epoch{Time: 1088946052, Level: 1}, n: 1088917364, x: 14}, {e: lookup.Epoch{Time: 1112337595, Level: 17}, n: 1111008110, x: 22}, {e: lookup.Epoch{Time: 1099990284, Level: 5}, n: 1099968370, x: 15}, {e: lookup.Epoch{Time: 1087036441, Level: 16}, n: 1053967855, x: 25}, {e: lookup.Epoch{Time: 1069225185, Level: 8}, n: 1069224660, x: 10}, {e: lookup.Epoch{Time: 1057505479, Level: 9}, n: 1057505170, x: 14}, {e: lookup.Epoch{Time: 1072381377, Level: 12}, n: 1065950959, x: 22}, {e: lookup.Epoch{Time: 1093887139, Level: 8}, n: 1093863305, x: 14}, {e: lookup.Epoch{Time: 1082366510, Level: 24}, n: 1082366510, x: 23}, {e: lookup.Epoch{Time: 1103231132, Level: 14}, n: 1102292201, x: 22}, {e: lookup.Epoch{Time: 1094502355, Level: 3}, n: 1094324652, x: 18}, {e: lookup.Epoch{Time: 1068488344, Level: 12}, n: 1067577330, x: 19}, {e: lookup.Epoch{Time: 1050278233, Level: 12}, n: 1050278232, x: 11}, {e: lookup.Epoch{Time: 1047660768, Level: 5}, n: 1047652137, x: 17}, {e: lookup.Epoch{Time: 1060116167, Level: 11}, n: 1060114091, x: 12}, {e: lookup.Epoch{Time: 1068149392, Level: 21}, n: 1052074801, x: 24}, {e: lookup.Epoch{Time: 1081934120, Level: 6}, n: 1081933847, x: 8}, {e: lookup.Epoch{Time: 1107943693, Level: 16}, n: 1107096139, x: 25}, {e: lookup.Epoch{Time: 1131571649, Level: 9}, n: 1131570428, x: 11}, {e: lookup.Epoch{Time: 1123139367, Level: 0}, n: 1122912198, x: 20}, {e: lookup.Epoch{Time: 1121144423, Level: 6}, n: 1120568289, x: 20}, {e: lookup.Epoch{Time: 1089932411, Level: 17}, n: 1089932410, x: 16}, {e: lookup.Epoch{Time: 1104899012, Level: 22}, n: 1098978789, x: 22}, {e: lookup.Epoch{Time: 1094588059, Level: 21}, n: 1094588059, x: 20}, {e: lookup.Epoch{Time: 1114987438, Level: 24}, n: 1114987437, x: 23}, {e: lookup.Epoch{Time: 1084186305, Level: 7}, n: 1084186241, x: 6}, {e: lookup.Epoch{Time: 1058827111, Level: 8}, n: 1058826504, x: 9}, {e: lookup.Epoch{Time: 1090679810, Level: 12}, n: 1090616539, x: 17}, {e: lookup.Epoch{Time: 1084299475, Level: 23}, n: 1084299475, x: 22}}
+
+func TestGetNextLevel(t *testing.T) {
+
+ // First, test well-known cases
+ last := lookup.Epoch{
+ Time: 1533799046,
+ Level: 5,
+ }
+
+ level := lookup.GetNextLevel(last, last.Time)
+ expected := uint8(4)
+ if level != expected {
+ t.Fatalf("Expected GetNextLevel to return %d for same-time updates at a nonzero level, got %d", expected, level)
+ }
+
+ level = lookup.GetNextLevel(last, last.Time+(1<<lookup.HighestLevel)+3000)
+ expected = lookup.HighestLevel
+ if level != expected {
+ t.Fatalf("Expected GetNextLevel to return %d for updates set 2^lookup.HighestLevel seconds away, got %d", expected, level)
+ }
+
+ level = lookup.GetNextLevel(last, last.Time+(1<<last.Level))
+ expected = last.Level
+ if level != expected {
+ t.Fatalf("Expected GetNextLevel to return %d for updates set 2^last.Level seconds away, got %d", expected, level)
+ }
+
+ last.Level = 0
+ level = lookup.GetNextLevel(last, last.Time)
+ expected = 0
+ if level != expected {
+ t.Fatalf("Expected GetNextLevel to return %d for same-time updates at a zero level, got %d", expected, level)
+ }
+
+ // run a batch of 100 cooked tests
+ for _, s := range testGetNextLevelCases {
+ level := lookup.GetNextLevel(s.e, s.n)
+ if level != s.x {
+ t.Fatalf("Expected GetNextLevel to return %d for last=%s when now=%d, got %d", s.x, s.e.String(), s.n, level)
+ }
+ }
+
+}
+
+// cookGetNextLevelTests is used to generate a deterministic
+// set of cases for TestGetNextLevel and thus "freeze" its current behavior
+func CookGetNextLevelTests(t *testing.T) {
+ st := ""
+ var last lookup.Epoch
+ last.Time = 1000000000
+ var now uint64
+ var expected uint8
+ for i := 0; i < 100; i++ {
+ last.Time += uint64(rand.Intn(1<<26)) - (1 << 25)
+ last.Level = uint8(rand.Intn(25))
+ v := last.Level + uint8(rand.Intn(lookup.HighestLevel))
+ if v > lookup.HighestLevel {
+ v = 0
+ }
+ now = last.Time + uint64(rand.Intn(1<<v+1)) - (1 << v)
+ expected = lookup.GetNextLevel(last, now)
+ st = fmt.Sprintf("%s,testG{e:lookup.Epoch{Time:%d, Level:%d}, n:%d, x:%d}", st, last.Time, last.Level, now, expected)
+ }
+ fmt.Println(st)
+}
diff --git a/swarm/storage/feeds/query.go b/swarm/storage/feeds/query.go
new file mode 100644
index 000000000..7bd2800a8
--- /dev/null
+++ b/swarm/storage/feeds/query.go
@@ -0,0 +1,78 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+// Query is used to specify constraints when performing an update lookup
+// TimeLimit indicates an upper bound for the search. Set to 0 for "now"
+type Query struct {
+ Feed
+ Hint lookup.Epoch
+ TimeLimit uint64
+}
+
+// FromValues deserializes this instance from a string key-value store
+// useful to parse query strings
+func (q *Query) FromValues(values Values) error {
+ time, _ := strconv.ParseUint(values.Get("time"), 10, 64)
+ q.TimeLimit = time
+
+ level, _ := strconv.ParseUint(values.Get("hint.level"), 10, 32)
+ q.Hint.Level = uint8(level)
+ q.Hint.Time, _ = strconv.ParseUint(values.Get("hint.time"), 10, 64)
+ if q.Feed.User == (common.Address{}) {
+ return q.Feed.FromValues(values)
+ }
+ return nil
+}
+
+// AppendValues serializes this structure into the provided string key-value store
+// useful to build query strings
+func (q *Query) AppendValues(values Values) {
+ if q.TimeLimit != 0 {
+ values.Set("time", fmt.Sprintf("%d", q.TimeLimit))
+ }
+ if q.Hint.Level != 0 {
+ values.Set("hint.level", fmt.Sprintf("%d", q.Hint.Level))
+ }
+ if q.Hint.Time != 0 {
+ values.Set("hint.time", fmt.Sprintf("%d", q.Hint.Time))
+ }
+ q.Feed.AppendValues(values)
+}
+
+// NewQuery constructs an Query structure to find updates on or before `time`
+// if time == 0, the latest update will be looked up
+func NewQuery(feed *Feed, time uint64, hint lookup.Epoch) *Query {
+ return &Query{
+ TimeLimit: time,
+ Feed: *feed,
+ Hint: hint,
+ }
+}
+
+// NewQueryLatest generates lookup parameters that look for the latest update to a feed
+func NewQueryLatest(feed *Feed, hint lookup.Epoch) *Query {
+ return NewQuery(feed, 0, hint)
+}
diff --git a/swarm/storage/feeds/query_test.go b/swarm/storage/feeds/query_test.go
new file mode 100644
index 000000000..1420c69ae
--- /dev/null
+++ b/swarm/storage/feeds/query_test.go
@@ -0,0 +1,38 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "testing"
+)
+
+func getTestQuery() *Query {
+ id := getTestID()
+ return &Query{
+ TimeLimit: 5000,
+ Feed: id.Feed,
+ Hint: id.Epoch,
+ }
+}
+
+func TestQueryValues(t *testing.T) {
+ var expected = KV{"hint.level": "25", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"}
+
+ query := getTestQuery()
+ testValueSerializer(t, query, expected)
+
+}
diff --git a/swarm/storage/feeds/request.go b/swarm/storage/feeds/request.go
new file mode 100644
index 000000000..719d8fba8
--- /dev/null
+++ b/swarm/storage/feeds/request.go
@@ -0,0 +1,284 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "bytes"
+ "encoding/json"
+ "hash"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+// Request represents a request to sign or signed Feed Update message
+type Request struct {
+ Update // actual content that will be put on the chunk, less signature
+ Signature *Signature
+ idAddr storage.Address // cached chunk address for the update (not serialized, for internal use)
+ binaryData []byte // cached serialized data (does not get serialized again!, for efficiency/internal use)
+}
+
+// updateRequestJSON represents a JSON-serialized UpdateRequest
+type updateRequestJSON struct {
+ ID
+ ProtocolVersion uint8 `json:"protocolVersion"`
+ Data string `json:"data,omitempty"`
+ Signature string `json:"signature,omitempty"`
+}
+
+// Request layout
+// Update bytes
+// SignatureLength bytes
+const minimumSignedUpdateLength = minimumUpdateDataLength + signatureLength
+
+// NewFirstRequest returns a ready to sign request to publish a first feed update
+func NewFirstRequest(topic Topic) *Request {
+
+ request := new(Request)
+
+ // get the current time
+ now := TimestampProvider.Now().Time
+ request.Epoch = lookup.GetFirstEpoch(now)
+ request.Feed.Topic = topic
+ request.Header.Version = ProtocolVersion
+
+ return request
+}
+
+// SetData stores the payload data the feed update will be updated with
+func (r *Request) SetData(data []byte) {
+ r.data = data
+ r.Signature = nil
+}
+
+// IsUpdate returns true if this request models a signed update or otherwise it is a signature request
+func (r *Request) IsUpdate() bool {
+ return r.Signature != nil
+}
+
+// Verify checks that signatures are valid
+func (r *Request) Verify() (err error) {
+ if len(r.data) == 0 {
+ return NewError(ErrInvalidValue, "Update does not contain data")
+ }
+ if r.Signature == nil {
+ return NewError(ErrInvalidSignature, "Missing signature field")
+ }
+
+ digest, err := r.GetDigest()
+ if err != nil {
+ return err
+ }
+
+ // get the address of the signer (which also checks that it's a valid signature)
+ r.Feed.User, err = getUserAddr(digest, *r.Signature)
+ if err != nil {
+ return err
+ }
+
+ // check that the lookup information contained in the chunk matches the updateAddr (chunk search key)
+ // that was used to retrieve this chunk
+ // if this validation fails, someone forged a chunk.
+ if !bytes.Equal(r.idAddr, r.Addr()) {
+ return NewError(ErrInvalidSignature, "Signature address does not match with update user address")
+ }
+
+ return nil
+}
+
+// Sign executes the signature to validate the update message
+func (r *Request) Sign(signer Signer) error {
+ r.Feed.User = signer.Address()
+ r.binaryData = nil //invalidate serialized data
+ digest, err := r.GetDigest() // computes digest and serializes into .binaryData
+ if err != nil {
+ return err
+ }
+
+ signature, err := signer.Sign(digest)
+ if err != nil {
+ return err
+ }
+
+ // Although the Signer interface returns the public address of the signer,
+ // recover it from the signature to see if they match
+ userAddr, err := getUserAddr(digest, signature)
+ if err != nil {
+ return NewError(ErrInvalidSignature, "Error verifying signature")
+ }
+
+ if userAddr != signer.Address() { // sanity check to make sure the Signer is declaring the same address used to sign!
+ return NewError(ErrInvalidSignature, "Signer address does not match update user address")
+ }
+
+ r.Signature = &signature
+ r.idAddr = r.Addr()
+ return nil
+}
+
+// GetDigest creates the feed update digest used in signatures
+// the serialized payload is cached in .binaryData
+func (r *Request) GetDigest() (result common.Hash, err error) {
+ hasher := hashPool.Get().(hash.Hash)
+ defer hashPool.Put(hasher)
+ hasher.Reset()
+ dataLength := r.Update.binaryLength()
+ if r.binaryData == nil {
+ r.binaryData = make([]byte, dataLength+signatureLength)
+ if err := r.Update.binaryPut(r.binaryData[:dataLength]); err != nil {
+ return result, err
+ }
+ }
+ hasher.Write(r.binaryData[:dataLength]) //everything except the signature.
+
+ return common.BytesToHash(hasher.Sum(nil)), nil
+}
+
+// create an update chunk.
+func (r *Request) toChunk() (storage.Chunk, error) {
+
+ // Check that the update is signed and serialized
+ // For efficiency, data is serialized during signature and cached in
+ // the binaryData field when computing the signature digest in .getDigest()
+ if r.Signature == nil || r.binaryData == nil {
+ return nil, NewError(ErrInvalidSignature, "toChunk called without a valid signature or payload data. Call .Sign() first.")
+ }
+
+ updateLength := r.Update.binaryLength()
+
+ // signature is the last item in the chunk data
+ copy(r.binaryData[updateLength:], r.Signature[:])
+
+ chunk := storage.NewChunk(r.idAddr, r.binaryData)
+ return chunk, nil
+}
+
+// fromChunk populates this structure from chunk data. It does not verify the signature is valid.
+func (r *Request) fromChunk(updateAddr storage.Address, chunkdata []byte) error {
+ // for update chunk layout see Request definition
+
+ //deserialize the feed update portion
+ if err := r.Update.binaryGet(chunkdata[:len(chunkdata)-signatureLength]); err != nil {
+ return err
+ }
+
+ // Extract the signature
+ var signature *Signature
+ cursor := r.Update.binaryLength()
+ sigdata := chunkdata[cursor : cursor+signatureLength]
+ if len(sigdata) > 0 {
+ signature = &Signature{}
+ copy(signature[:], sigdata)
+ }
+
+ r.Signature = signature
+ r.idAddr = updateAddr
+ r.binaryData = chunkdata
+
+ return nil
+
+}
+
+// FromValues deserializes this instance from a string key-value store
+// useful to parse query strings
+func (r *Request) FromValues(values Values, data []byte) error {
+ signatureBytes, err := hexutil.Decode(values.Get("signature"))
+ if err != nil {
+ r.Signature = nil
+ } else {
+ if len(signatureBytes) != signatureLength {
+ return NewError(ErrInvalidSignature, "Incorrect signature length")
+ }
+ r.Signature = new(Signature)
+ copy(r.Signature[:], signatureBytes)
+ }
+ err = r.Update.FromValues(values, data)
+ if err != nil {
+ return err
+ }
+ r.idAddr = r.Addr()
+ return err
+}
+
+// AppendValues serializes this structure into the provided string key-value store
+// useful to build query strings
+func (r *Request) AppendValues(values Values) []byte {
+ if r.Signature != nil {
+ values.Set("signature", hexutil.Encode(r.Signature[:]))
+ }
+ return r.Update.AppendValues(values)
+}
+
+// fromJSON takes an update request JSON and populates an UpdateRequest
+func (r *Request) fromJSON(j *updateRequestJSON) error {
+
+ r.ID = j.ID
+ r.Header.Version = j.ProtocolVersion
+
+ var err error
+ if j.Data != "" {
+ r.data, err = hexutil.Decode(j.Data)
+ if err != nil {
+ return NewError(ErrInvalidValue, "Cannot decode data")
+ }
+ }
+
+ if j.Signature != "" {
+ sigBytes, err := hexutil.Decode(j.Signature)
+ if err != nil || len(sigBytes) != signatureLength {
+ return NewError(ErrInvalidSignature, "Cannot decode signature")
+ }
+ r.Signature = new(Signature)
+ r.idAddr = r.Addr()
+ copy(r.Signature[:], sigBytes)
+ }
+ return nil
+}
+
+// UnmarshalJSON takes a JSON structure stored in a byte array and populates the Request object
+// Implements json.Unmarshaler interface
+func (r *Request) UnmarshalJSON(rawData []byte) error {
+ var requestJSON updateRequestJSON
+ if err := json.Unmarshal(rawData, &requestJSON); err != nil {
+ return err
+ }
+ return r.fromJSON(&requestJSON)
+}
+
+// MarshalJSON takes an update request and encodes it as a JSON structure into a byte array
+// Implements json.Marshaler interface
+func (r *Request) MarshalJSON() (rawData []byte, err error) {
+ var signatureString, dataString string
+ if r.Signature != nil {
+ signatureString = hexutil.Encode(r.Signature[:])
+ }
+ if r.data != nil {
+ dataString = hexutil.Encode(r.data)
+ }
+
+ requestJSON := &updateRequestJSON{
+ ID: r.ID,
+ ProtocolVersion: r.Header.Version,
+ Data: dataString,
+ Signature: signatureString,
+ }
+
+ return json.Marshal(requestJSON)
+}
diff --git a/swarm/storage/feeds/request_test.go b/swarm/storage/feeds/request_test.go
new file mode 100644
index 000000000..2e3783834
--- /dev/null
+++ b/swarm/storage/feeds/request_test.go
@@ -0,0 +1,312 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/storage/feeds/lookup"
+)
+
+func areEqualJSON(s1, s2 string) (bool, error) {
+ //credit for the trick: turtlemonvh https://gist.github.com/turtlemonvh/e4f7404e28387fadb8ad275a99596f67
+ var o1 interface{}
+ var o2 interface{}
+
+ err := json.Unmarshal([]byte(s1), &o1)
+ if err != nil {
+ return false, fmt.Errorf("Error mashalling string 1 :: %s", err.Error())
+ }
+ err = json.Unmarshal([]byte(s2), &o2)
+ if err != nil {
+ return false, fmt.Errorf("Error mashalling string 2 :: %s", err.Error())
+ }
+
+ return reflect.DeepEqual(o1, o2), nil
+}
+
+// TestEncodingDecodingUpdateRequests ensures that requests are serialized properly
+// while also checking cryptographically that only the owner of a Feed can update it.
+func TestEncodingDecodingUpdateRequests(t *testing.T) {
+
+ charlie := newCharlieSigner() //Charlie
+ bob := newBobSigner() //Bob
+
+ // Create a feed to our good guy Charlie's name
+ topic, _ := NewTopic("a good topic name", nil)
+ firstRequest := NewFirstRequest(topic)
+ firstRequest.User = charlie.Address()
+
+ // We now encode the create message to simulate we send it over the wire
+ messageRawData, err := firstRequest.MarshalJSON()
+ if err != nil {
+ t.Fatalf("Error encoding first feed update request: %s", err)
+ }
+
+ // ... the message arrives and is decoded...
+ var recoveredFirstRequest Request
+ if err := recoveredFirstRequest.UnmarshalJSON(messageRawData); err != nil {
+ t.Fatalf("Error decoding first feed update request: %s", err)
+ }
+
+ // ... but verification should fail because it is not signed!
+ if err := recoveredFirstRequest.Verify(); err == nil {
+ t.Fatal("Expected Verify to fail since the message is not signed")
+ }
+
+ // We now assume that the feed ypdate was created and propagated.
+
+ const expectedSignature = "0x7235b27a68372ddebcf78eba48543fa460864b0b0e99cb533fcd3664820e603312d29426dd00fb39628f5299480a69bf6e462838d78de49ce0704c754c9deb2601"
+ const expectedJSON = `{"feed":{"topic":"0x6120676f6f6420746f706963206e616d65000000000000000000000000000000","user":"0x876a8936a7cd0b79ef0735ad0896c1afe278781c"},"epoch":{"time":1000,"level":1},"protocolVersion":0,"data":"0x5468697320686f75722773207570646174653a20537761726d2039392e3020686173206265656e2072656c656173656421"}`
+
+ //Put together an unsigned update request that we will serialize to send it to the signer.
+ data := []byte("This hour's update: Swarm 99.0 has been released!")
+ request := &Request{
+ Update: Update{
+ ID: ID{
+ Epoch: lookup.Epoch{
+ Time: 1000,
+ Level: 1,
+ },
+ Feed: firstRequest.Update.Feed,
+ },
+ data: data,
+ },
+ }
+
+ messageRawData, err = request.MarshalJSON()
+ if err != nil {
+ t.Fatalf("Error encoding update request: %s", err)
+ }
+
+ equalJSON, err := areEqualJSON(string(messageRawData), expectedJSON)
+ if err != nil {
+ t.Fatalf("Error decoding update request JSON: %s", err)
+ }
+ if !equalJSON {
+ t.Fatalf("Received a different JSON message. Expected %s, got %s", expectedJSON, string(messageRawData))
+ }
+
+ // now the encoded message messageRawData is sent over the wire and arrives to the signer
+
+ //Attempt to extract an UpdateRequest out of the encoded message
+ var recoveredRequest Request
+ if err := recoveredRequest.UnmarshalJSON(messageRawData); err != nil {
+ t.Fatalf("Error decoding update request: %s", err)
+ }
+
+ //sign the request and see if it matches our predefined signature above.
+ if err := recoveredRequest.Sign(charlie); err != nil {
+ t.Fatalf("Error signing request: %s", err)
+ }
+
+ compareByteSliceToExpectedHex(t, "signature", recoveredRequest.Signature[:], expectedSignature)
+
+ // mess with the signature and see what happens. To alter the signature, we briefly decode it as JSON
+ // to alter the signature field.
+ var j updateRequestJSON
+ if err := json.Unmarshal([]byte(expectedJSON), &j); err != nil {
+ t.Fatal("Error unmarshalling test json, check expectedJSON constant")
+ }
+ j.Signature = "Certainly not a signature"
+ corruptMessage, _ := json.Marshal(j) // encode the message with the bad signature
+ var corruptRequest Request
+ if err = corruptRequest.UnmarshalJSON(corruptMessage); err == nil {
+ t.Fatal("Expected DecodeUpdateRequest to fail when trying to interpret a corrupt message with an invalid signature")
+ }
+
+ // Now imagine Bob wants to create an update of his own about the same Feed,
+ // signing a message with his private key
+ if err := request.Sign(bob); err != nil {
+ t.Fatalf("Error signing: %s", err)
+ }
+
+ // Now Bob encodes the message to send it over the wire...
+ messageRawData, err = request.MarshalJSON()
+ if err != nil {
+ t.Fatalf("Error encoding message:%s", err)
+ }
+
+ // ... the message arrives to our Swarm node and it is decoded.
+ recoveredRequest = Request{}
+ if err := recoveredRequest.UnmarshalJSON(messageRawData); err != nil {
+ t.Fatalf("Error decoding message:%s", err)
+ }
+
+ // Before checking what happened with Bob's update, let's see what would happen if we mess
+ // with the signature big time to see if Verify catches it
+ savedSignature := *recoveredRequest.Signature // save the signature for later
+ binary.LittleEndian.PutUint64(recoveredRequest.Signature[5:], 556845463424) // write some random data to break the signature
+ if err = recoveredRequest.Verify(); err == nil {
+ t.Fatal("Expected Verify to fail on corrupt signature")
+ }
+
+ // restore the Bob's signature from corruption
+ *recoveredRequest.Signature = savedSignature
+
+ // Now the signature is not corrupt
+ if err = recoveredRequest.Verify(); err != nil {
+ t.Fatal(err)
+ }
+
+ // Reuse object and sign with our friend Charlie's private key
+ if err := recoveredRequest.Sign(charlie); err != nil {
+ t.Fatalf("Error signing with the correct private key: %s", err)
+ }
+
+ // And now, Verify should work since this update now belongs to Charlie
+ if err = recoveredRequest.Verify(); err != nil {
+ t.Fatalf("Error verifying that Charlie, can sign a reused request object:%s", err)
+ }
+
+ // mess with the lookup key to make sure Verify fails:
+ recoveredRequest.Time = 77999 // this will alter the lookup key
+ if err = recoveredRequest.Verify(); err == nil {
+ t.Fatalf("Expected Verify to fail since the lookup key has been altered")
+ }
+}
+
+func getTestRequest() *Request {
+ return &Request{
+ Update: *getTestFeedUpdate(),
+ }
+}
+
+func TestUpdateChunkSerializationErrorChecking(t *testing.T) {
+
+ // Test that parseUpdate fails if the chunk is too small
+ var r Request
+ if err := r.fromChunk(storage.ZeroAddr, make([]byte, minimumUpdateDataLength-1+signatureLength)); err == nil {
+ t.Fatalf("Expected request.fromChunk to fail when chunkData contains less than %d bytes", minimumUpdateDataLength)
+ }
+
+ r = *getTestRequest()
+
+ _, err := r.toChunk()
+ if err == nil {
+ t.Fatal("Expected request.toChunk to fail when there is no data")
+ }
+ r.data = []byte("Al bien hacer jamás le falta premio") // put some arbitrary length data
+ _, err = r.toChunk()
+ if err == nil {
+ t.Fatal("expected request.toChunk to fail when there is no signature")
+ }
+
+ charlie := newCharlieSigner()
+ if err := r.Sign(charlie); err != nil {
+ t.Fatalf("error signing:%s", err)
+ }
+
+ chunk, err := r.toChunk()
+ if err != nil {
+ t.Fatalf("error creating update chunk:%s", err)
+ }
+
+ compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f5a0ffe0bc27f207cd5b00944c8b9cee93e08b89b5ada777f123ac535189333f174a6a4ca2f43a92c4a477a49d774813c36ce8288552c58e6205b0ac35d0507eb00")
+
+ var recovered Request
+ recovered.fromChunk(chunk.Address(), chunk.Data())
+ if !reflect.DeepEqual(recovered, r) {
+ t.Fatal("Expected recovered Request update to equal the original one")
+ }
+}
+
+// check that signature address matches update signer address
+func TestReverse(t *testing.T) {
+
+ epoch := lookup.Epoch{
+ Time: 7888,
+ Level: 6,
+ }
+
+ // make fake timeProvider
+ timeProvider := &fakeTimeProvider{
+ currentTime: startTime.Time,
+ }
+
+ // signer containing private key
+ signer := newAliceSigner()
+
+ // set up rpc and create Feeds handler
+ _, _, teardownTest, err := setupTest(timeProvider, signer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer teardownTest()
+
+ topic, _ := NewTopic("Cervantes quotes", nil)
+ feed := Feed{
+ Topic: topic,
+ User: signer.Address(),
+ }
+
+ data := []byte("Donde una puerta se cierra, otra se abre")
+
+ request := new(Request)
+ request.Feed = feed
+ request.Epoch = epoch
+ request.data = data
+
+ // generate a chunk key for this request
+ key := request.Addr()
+
+ if err = request.Sign(signer); err != nil {
+ t.Fatal(err)
+ }
+
+ chunk, err := request.toChunk()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // check that we can recover the owner account from the update chunk's signature
+ var checkUpdate Request
+ if err := checkUpdate.fromChunk(chunk.Address(), chunk.Data()); err != nil {
+ t.Fatal(err)
+ }
+ checkdigest, err := checkUpdate.GetDigest()
+ if err != nil {
+ t.Fatal(err)
+ }
+ recoveredAddr, err := getUserAddr(checkdigest, *checkUpdate.Signature)
+ if err != nil {
+ t.Fatalf("Retrieve address from signature fail: %v", err)
+ }
+ originalAddr := crypto.PubkeyToAddress(signer.PrivKey.PublicKey)
+
+ // check that the metadata retrieved from the chunk matches what we gave it
+ if recoveredAddr != originalAddr {
+ t.Fatalf("addresses dont match: %x != %x", originalAddr, recoveredAddr)
+ }
+
+ if !bytes.Equal(key[:], chunk.Address()[:]) {
+ t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Address())
+ }
+ if epoch != checkUpdate.Epoch {
+ t.Fatalf("Expected epoch to be '%s', was '%s'", epoch.String(), checkUpdate.Epoch.String())
+ }
+ if !bytes.Equal(data, checkUpdate.data) {
+ t.Fatalf("Expected data '%x', was '%x'", data, checkUpdate.data)
+ }
+}
diff --git a/swarm/storage/feeds/sign.go b/swarm/storage/feeds/sign.go
new file mode 100644
index 000000000..a69942f2b
--- /dev/null
+++ b/swarm/storage/feeds/sign.go
@@ -0,0 +1,75 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "crypto/ecdsa"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+)
+
+const signatureLength = 65
+
+// Signature is an alias for a static byte array with the size of a signature
+type Signature [signatureLength]byte
+
+// Signer signs Feed update payloads
+type Signer interface {
+ Sign(common.Hash) (Signature, error)
+ Address() common.Address
+}
+
+// GenericSigner implements the Signer interface
+// It is the vanilla signer that probably should be used in most cases
+type GenericSigner struct {
+ PrivKey *ecdsa.PrivateKey
+ address common.Address
+}
+
+// NewGenericSigner builds a signer that will sign everything with the provided private key
+func NewGenericSigner(privKey *ecdsa.PrivateKey) *GenericSigner {
+ return &GenericSigner{
+ PrivKey: privKey,
+ address: crypto.PubkeyToAddress(privKey.PublicKey),
+ }
+}
+
+// Sign signs the supplied data
+// It wraps the ethereum crypto.Sign() method
+func (s *GenericSigner) Sign(data common.Hash) (signature Signature, err error) {
+ signaturebytes, err := crypto.Sign(data.Bytes(), s.PrivKey)
+ if err != nil {
+ return
+ }
+ copy(signature[:], signaturebytes)
+ return
+}
+
+// Address returns the public key of the signer's private key
+func (s *GenericSigner) Address() common.Address {
+ return s.address
+}
+
+// getUserAddr extracts the address of the Feed update signer
+func getUserAddr(digest common.Hash, signature Signature) (common.Address, error) {
+ pub, err := crypto.SigToPub(digest.Bytes(), signature[:])
+ if err != nil {
+ return common.Address{}, err
+ }
+ return crypto.PubkeyToAddress(*pub), nil
+}
diff --git a/swarm/storage/feeds/testutil.go b/swarm/storage/feeds/testutil.go
new file mode 100644
index 000000000..879f73348
--- /dev/null
+++ b/swarm/storage/feeds/testutil.go
@@ -0,0 +1,71 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const (
+ testDbDirName = "feeds"
+)
+
+type TestHandler struct {
+ *Handler
+}
+
+func (t *TestHandler) Close() {
+ t.chunkStore.Close()
+}
+
+type mockNetFetcher struct{}
+
+func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
+}
+func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
+}
+
+func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher {
+ return &mockNetFetcher{}
+}
+
+// NewTestHandler creates Handler object to be used for testing purposes.
+func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) {
+ path := filepath.Join(datadir, testDbDirName)
+ fh := NewHandler(params)
+ localstoreparams := storage.NewDefaultLocalStoreParams()
+ localstoreparams.Init(path)
+ localStore, err := storage.NewLocalStore(localstoreparams, nil)
+ if err != nil {
+ return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err)
+ }
+ localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)))
+ localStore.Validators = append(localStore.Validators, fh)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, err
+ }
+ netStore.NewNetFetcherFunc = newFakeNetFetcher
+ fh.SetStore(netStore)
+ return &TestHandler{fh}, nil
+}
diff --git a/swarm/storage/feeds/timestampprovider.go b/swarm/storage/feeds/timestampprovider.go
new file mode 100644
index 000000000..f6aa0775c
--- /dev/null
+++ b/swarm/storage/feeds/timestampprovider.go
@@ -0,0 +1,84 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "time"
+)
+
+// TimestampProvider sets the time source of the feeds package
+var TimestampProvider timestampProvider = NewDefaultTimestampProvider()
+
+// Timestamp encodes a point in time as a Unix epoch
+type Timestamp struct {
+ Time uint64 `json:"time"` // Unix epoch timestamp, in seconds
+}
+
+// 8 bytes uint64 Time
+const timestampLength = 8
+
+// timestampProvider interface describes a source of timestamp information
+type timestampProvider interface {
+ Now() Timestamp // returns the current timestamp information
+}
+
+// binaryGet populates the timestamp structure from the given byte slice
+func (t *Timestamp) binaryGet(data []byte) error {
+ if len(data) != timestampLength {
+ return NewError(ErrCorruptData, "timestamp data has the wrong size")
+ }
+ t.Time = binary.LittleEndian.Uint64(data[:8])
+ return nil
+}
+
+// binaryPut Serializes a Timestamp to a byte slice
+func (t *Timestamp) binaryPut(data []byte) error {
+ if len(data) != timestampLength {
+ return NewError(ErrCorruptData, "timestamp data has the wrong size")
+ }
+ binary.LittleEndian.PutUint64(data, t.Time)
+ return nil
+}
+
+// UnmarshalJSON implements the json.Unmarshaller interface
+func (t *Timestamp) UnmarshalJSON(data []byte) error {
+ return json.Unmarshal(data, &t.Time)
+}
+
+// MarshalJSON implements the json.Marshaller interface
+func (t *Timestamp) MarshalJSON() ([]byte, error) {
+ return json.Marshal(t.Time)
+}
+
+// DefaultTimestampProvider is a TimestampProvider that uses system time
+// as time source
+type DefaultTimestampProvider struct {
+}
+
+// NewDefaultTimestampProvider creates a system clock based timestamp provider
+func NewDefaultTimestampProvider() *DefaultTimestampProvider {
+ return &DefaultTimestampProvider{}
+}
+
+// Now returns the current time according to this provider
+func (dtp *DefaultTimestampProvider) Now() Timestamp {
+ return Timestamp{
+ Time: uint64(time.Now().Unix()),
+ }
+}
diff --git a/swarm/storage/feeds/topic.go b/swarm/storage/feeds/topic.go
new file mode 100644
index 000000000..2dc8c18cd
--- /dev/null
+++ b/swarm/storage/feeds/topic.go
@@ -0,0 +1,105 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// TopicLength establishes the max length of a topic string
+const TopicLength = storage.AddressLength
+
+// Topic represents what a feed is about
+type Topic [TopicLength]byte
+
+// ErrTopicTooLong is returned when creating a topic with a name/related content too long
+var ErrTopicTooLong = fmt.Errorf("Topic is too long. Max length is %d", TopicLength)
+
+// NewTopic creates a new topic from a provided name and "related content" byte array,
+// merging the two together.
+// If relatedContent or name are longer than TopicLength, they will be truncated and an error returned
+// name can be an empty string
+// relatedContent can be nil
+func NewTopic(name string, relatedContent []byte) (topic Topic, err error) {
+ if relatedContent != nil {
+ contentLength := len(relatedContent)
+ if contentLength > TopicLength {
+ contentLength = TopicLength
+ err = ErrTopicTooLong
+ }
+ copy(topic[:], relatedContent[:contentLength])
+ }
+ nameBytes := []byte(name)
+ nameLength := len(nameBytes)
+ if nameLength > TopicLength {
+ nameLength = TopicLength
+ err = ErrTopicTooLong
+ }
+ bitutil.XORBytes(topic[:], topic[:], nameBytes[:nameLength])
+ return topic, err
+}
+
+// Hex will return the topic encoded as an hex string
+func (t *Topic) Hex() string {
+ return hexutil.Encode(t[:])
+}
+
+// FromHex will parse a hex string into this Topic instance
+func (t *Topic) FromHex(hex string) error {
+ bytes, err := hexutil.Decode(hex)
+ if err != nil || len(bytes) != len(t) {
+ return NewErrorf(ErrInvalidValue, "Cannot decode topic")
+ }
+ copy(t[:], bytes)
+ return nil
+}
+
+// Name will try to extract the topic name out of the Topic
+func (t *Topic) Name(relatedContent []byte) string {
+ nameBytes := *t
+ if relatedContent != nil {
+ contentLength := len(relatedContent)
+ if contentLength > TopicLength {
+ contentLength = TopicLength
+ }
+ bitutil.XORBytes(nameBytes[:], t[:], relatedContent[:contentLength])
+ }
+ z := bytes.IndexByte(nameBytes[:], 0)
+ if z < 0 {
+ z = TopicLength
+ }
+ return string(nameBytes[:z])
+
+}
+
+// UnmarshalJSON implements the json.Unmarshaller interface
+func (t *Topic) UnmarshalJSON(data []byte) error {
+ var hex string
+ json.Unmarshal(data, &hex)
+ return t.FromHex(hex)
+}
+
+// MarshalJSON implements the json.Marshaller interface
+func (t *Topic) MarshalJSON() ([]byte, error) {
+ return json.Marshal(t.Hex())
+}
diff --git a/swarm/storage/feeds/topic_test.go b/swarm/storage/feeds/topic_test.go
new file mode 100644
index 000000000..8994002d7
--- /dev/null
+++ b/swarm/storage/feeds/topic_test.go
@@ -0,0 +1,50 @@
+package feeds
+
+import (
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+)
+
+func TestTopic(t *testing.T) {
+ related, _ := hexutil.Decode("0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789")
+ topicName := "test-topic"
+ topic, _ := NewTopic(topicName, related)
+ hex := topic.Hex()
+ expectedHex := "0xdfa89c750e3108f9c2aeef0123456789abcdef0123456789abcdef0123456789"
+ if hex != expectedHex {
+ t.Fatalf("Expected %s, got %s", expectedHex, hex)
+ }
+
+ var topic2 Topic
+ topic2.FromHex(hex)
+ if topic2 != topic {
+ t.Fatal("Expected recovered topic to be equal to original one")
+ }
+
+ if topic2.Name(related) != topicName {
+ t.Fatal("Retrieved name does not match")
+ }
+
+ bytes, err := topic2.MarshalJSON()
+ if err != nil {
+ t.Fatal(err)
+ }
+ expectedJSON := `"0xdfa89c750e3108f9c2aeef0123456789abcdef0123456789abcdef0123456789"`
+ equal, err := areEqualJSON(expectedJSON, string(bytes))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !equal {
+ t.Fatalf("Expected JSON to be %s, got %s", expectedJSON, string(bytes))
+ }
+
+ err = topic2.UnmarshalJSON(bytes)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if topic2 != topic {
+ t.Fatal("Expected recovered topic to be equal to original one")
+ }
+
+}
diff --git a/swarm/storage/feeds/update.go b/swarm/storage/feeds/update.go
new file mode 100644
index 000000000..02bd37522
--- /dev/null
+++ b/swarm/storage/feeds/update.go
@@ -0,0 +1,132 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/ethereum/go-ethereum/swarm/chunk"
+)
+
+// ProtocolVersion defines the current version of the protocol that will be included in each update message
+const ProtocolVersion uint8 = 0
+
+const headerLength = 8
+
+// Header defines a update message header including a protocol version byte
+type Header struct {
+ Version uint8 // Protocol version
+ Padding [headerLength - 1]uint8 // reserved for future use
+}
+
+// Update encapsulates the information sent as part of a feed update
+type Update struct {
+ Header Header //
+ ID // Feed Update identifying information
+ data []byte // actual data payload
+}
+
+const minimumUpdateDataLength = idLength + headerLength + 1
+const maxUpdateDataLength = chunk.DefaultSize - signatureLength - idLength - headerLength
+
+// binaryPut serializes the feed update information into the given slice
+func (r *Update) binaryPut(serializedData []byte) error {
+ datalength := len(r.data)
+ if datalength == 0 {
+ return NewError(ErrInvalidValue, "a feed update must contain data")
+ }
+
+ if datalength > maxUpdateDataLength {
+ return NewErrorf(ErrInvalidValue, "feed update data is too big (length=%d). Max length=%d", datalength, maxUpdateDataLength)
+ }
+
+ if len(serializedData) != r.binaryLength() {
+ return NewErrorf(ErrInvalidValue, "slice passed to putBinary must be of exact size. Expected %d bytes", r.binaryLength())
+ }
+
+ var cursor int
+ // serialize Header
+ serializedData[cursor] = r.Header.Version
+ copy(serializedData[cursor+1:headerLength], r.Header.Padding[:headerLength-1])
+ cursor += headerLength
+
+ // serialize ID
+ if err := r.ID.binaryPut(serializedData[cursor : cursor+idLength]); err != nil {
+ return err
+ }
+ cursor += idLength
+
+ // add the data
+ copy(serializedData[cursor:], r.data)
+ cursor += datalength
+
+ return nil
+}
+
+// binaryLength returns the expected number of bytes this structure will take to encode
+func (r *Update) binaryLength() int {
+ return idLength + headerLength + len(r.data)
+}
+
+// binaryGet populates this instance from the information contained in the passed byte slice
+func (r *Update) binaryGet(serializedData []byte) error {
+ if len(serializedData) < minimumUpdateDataLength {
+ return NewErrorf(ErrNothingToReturn, "chunk less than %d bytes cannot be a feed update chunk", minimumUpdateDataLength)
+ }
+ dataLength := len(serializedData) - idLength - headerLength
+ // at this point we can be satisfied that we have the correct data length to read
+
+ var cursor int
+
+ // deserialize Header
+ r.Header.Version = serializedData[cursor] // extract the protocol version
+ copy(r.Header.Padding[:headerLength-1], serializedData[cursor+1:headerLength]) // extract the padding
+ cursor += headerLength
+
+ if err := r.ID.binaryGet(serializedData[cursor : cursor+idLength]); err != nil {
+ return err
+ }
+ cursor += idLength
+
+ data := serializedData[cursor : cursor+dataLength]
+ cursor += dataLength
+
+ // now that all checks have passed, copy data into structure
+ r.data = make([]byte, dataLength)
+ copy(r.data, data)
+
+ return nil
+
+}
+
+// FromValues deserializes this instance from a string key-value store
+// useful to parse query strings
+func (r *Update) FromValues(values Values, data []byte) error {
+ r.data = data
+ version, _ := strconv.ParseUint(values.Get("protocolVersion"), 10, 32)
+ r.Header.Version = uint8(version)
+ return r.ID.FromValues(values)
+}
+
+// AppendValues serializes this structure into the provided string key-value store
+// useful to build query strings
+func (r *Update) AppendValues(values Values) []byte {
+ r.ID.AppendValues(values)
+ values.Set("protocolVersion", fmt.Sprintf("%d", r.Header.Version))
+ return r.data
+}
diff --git a/swarm/storage/feeds/update_test.go b/swarm/storage/feeds/update_test.go
new file mode 100644
index 000000000..7763da0c8
--- /dev/null
+++ b/swarm/storage/feeds/update_test.go
@@ -0,0 +1,50 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package feeds
+
+import (
+ "testing"
+)
+
+func getTestFeedUpdate() *Update {
+ return &Update{
+ ID: *getTestID(),
+ data: []byte("El que lee mucho y anda mucho, ve mucho y sabe mucho"),
+ }
+}
+
+func TestUpdateSerializer(t *testing.T) {
+ testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f")
+}
+
+func TestUpdateLengthCheck(t *testing.T) {
+ testBinarySerializerLengthCheck(t, getTestFeedUpdate())
+ // Test fail if update is too big
+ update := getTestFeedUpdate()
+ update.data = make([]byte, maxUpdateDataLength+100)
+ serialized := make([]byte, update.binaryLength())
+ if err := update.binaryPut(serialized); err == nil {
+ t.Fatal("Expected update.binaryPut to fail since update is too big")
+ }
+
+ // test fail if data is empty or nil
+ update.data = nil
+ serialized = make([]byte, update.binaryLength())
+ if err := update.binaryPut(serialized); err == nil {
+ t.Fatal("Expected update.binaryPut to fail since data is empty")
+ }
+}