diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm')
52 files changed, 3136 insertions, 1868 deletions
diff --git a/swarm/api/api.go b/swarm/api/api.go index d733ad989..d7b6d8419 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -250,13 +250,6 @@ func NewAPI(fileStore *storage.FileStore, dns Resolver, resourceHandler *mru.Han return } -// Upload to be used only in TEST -func (a *API) Upload(ctx context.Context, uploadDir, index string, toEncrypt bool) (hash string, err error) { - fs := NewFileSystem(a) - hash, err = fs.Upload(uploadDir, index, toEncrypt) - return hash, err -} - // Retrieve FileStore reader API func (a *API) Retrieve(ctx context.Context, addr storage.Address) (reader storage.LazySectionReader, isEncrypted bool) { return a.fileStore.Retrieve(ctx, addr) diff --git a/swarm/api/config.go b/swarm/api/config.go index 3044dc2e5..baa13105a 100644 --- a/swarm/api/config.go +++ b/swarm/api/config.go @@ -62,6 +62,7 @@ type Config struct { NetworkID uint64 SwapEnabled bool SyncEnabled bool + SyncingSkipCheck bool DeliverySkipCheck bool LightNodeEnabled bool SyncUpdateDelay time.Duration @@ -89,7 +90,8 @@ func NewConfig() (c *Config) { NetworkID: network.DefaultNetworkID, SwapEnabled: false, SyncEnabled: true, - DeliverySkipCheck: false, + SyncingSkipCheck: false, + DeliverySkipCheck: true, SyncUpdateDelay: 15 * time.Second, SwapAPI: "", } diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index efefa9fae..2acdbd1ad 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -477,12 +477,12 @@ func testBzzGetPath(encrypted bool, t *testing.T) { var wait func(context.Context) error ctx := context.TODO() addr[i], wait, err = srv.FileStore.Store(ctx, reader[i], int64(len(mf)), encrypted) - for j := i + 1; j < len(testmanifest); j++ { - testmanifest[j] = strings.Replace(testmanifest[j], fmt.Sprintf("<key%v>", i), addr[i].Hex(), -1) - } if err != nil { t.Fatal(err) } + for j := i + 1; j < len(testmanifest); j++ { + testmanifest[j] = strings.Replace(testmanifest[j], fmt.Sprintf("<key%v>", i), addr[i].Hex(), -1) + } err = wait(ctx) if err != nil { t.Fatal(err) diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go index a1329a800..d44ad2277 100644 --- a/swarm/api/manifest.go +++ b/swarm/api/manifest.go @@ -69,9 +69,12 @@ func (a *API) NewManifest(ctx context.Context, toEncrypt bool) (storage.Address, if err != nil { return nil, err } - key, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), toEncrypt) - wait(ctx) - return key, err + addr, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), toEncrypt) + if err != nil { + return nil, err + } + err = wait(ctx) + return addr, err } // Manifest hack for supporting Mutable Resource Updates from the bzz: scheme @@ -87,8 +90,12 @@ func (a *API) NewResourceManifest(ctx context.Context, resourceAddr string) (sto if err != nil { return nil, err } - key, _, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), false) - return key, err + addr, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), false) + if err != nil { + return nil, err + } + err = wait(ctx) + return addr, err } // ManifestWriter is used to add and remove entries from an underlying manifest @@ -106,21 +113,26 @@ func (a *API) NewManifestWriter(ctx context.Context, addr storage.Address, quitC return &ManifestWriter{a, trie, quitC}, nil } -// AddEntry stores the given data and adds the resulting key to the manifest -func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (key storage.Address, err error) { +// AddEntry stores the given data and adds the resulting address to the manifest +func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (addr storage.Address, err error) { entry := newManifestTrieEntry(e, nil) if data != nil { - key, _, err = m.api.Store(ctx, data, e.Size, m.trie.encrypted) + var wait func(context.Context) error + addr, wait, err = m.api.Store(ctx, data, e.Size, m.trie.encrypted) + if err != nil { + return nil, err + } + err = wait(ctx) if err != nil { return nil, err } - entry.Hash = key.Hex() + entry.Hash = addr.Hex() } if entry.Hash == "" { - return key, errors.New("missing entry hash") + return addr, errors.New("missing entry hash") } m.trie.addEntry(entry, m.quitC) - return key, nil + return addr, nil } // RemoveEntry removes the given path from the manifest @@ -129,7 +141,7 @@ func (m *ManifestWriter) RemoveEntry(path string) error { return nil } -// Store stores the manifest, returning the resulting storage key +// Store stores the manifest, returning the resulting storage address func (m *ManifestWriter) Store() (storage.Address, error) { return m.trie.ref, m.trie.recalcAndStore() } @@ -211,51 +223,51 @@ type manifestTrieEntry struct { subtrie *manifestTrie } -func loadManifest(ctx context.Context, fileStore *storage.FileStore, hash storage.Address, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand - log.Trace("manifest lookup", "key", hash) +func loadManifest(ctx context.Context, fileStore *storage.FileStore, addr storage.Address, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand + log.Trace("manifest lookup", "addr", addr) // retrieve manifest via FileStore - manifestReader, isEncrypted := fileStore.Retrieve(ctx, hash) - log.Trace("reader retrieved", "key", hash) - return readManifest(manifestReader, hash, fileStore, isEncrypted, quitC, decrypt) + manifestReader, isEncrypted := fileStore.Retrieve(ctx, addr) + log.Trace("reader retrieved", "addr", addr) + return readManifest(manifestReader, addr, fileStore, isEncrypted, quitC, decrypt) } -func readManifest(mr storage.LazySectionReader, hash storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand +func readManifest(mr storage.LazySectionReader, addr storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand // TODO check size for oversized manifests size, err := mr.Size(mr.Context(), quitC) if err != nil { // size == 0 // can't determine size means we don't have the root chunk - log.Trace("manifest not found", "key", hash) + log.Trace("manifest not found", "addr", addr) err = fmt.Errorf("Manifest not Found") return } if size > manifestSizeLimit { - log.Warn("manifest exceeds size limit", "key", hash, "size", size, "limit", manifestSizeLimit) + log.Warn("manifest exceeds size limit", "addr", addr, "size", size, "limit", manifestSizeLimit) err = fmt.Errorf("Manifest size of %v bytes exceeds the %v byte limit", size, manifestSizeLimit) return } manifestData := make([]byte, size) read, err := mr.Read(manifestData) if int64(read) < size { - log.Trace("manifest not found", "key", hash) + log.Trace("manifest not found", "addr", addr) if err == nil { err = fmt.Errorf("Manifest retrieval cut short: read %v, expect %v", read, size) } return } - log.Debug("manifest retrieved", "key", hash) + log.Debug("manifest retrieved", "addr", addr) var man struct { Entries []*manifestTrieEntry `json:"entries"` } err = json.Unmarshal(manifestData, &man) if err != nil { - err = fmt.Errorf("Manifest %v is malformed: %v", hash.Log(), err) - log.Trace("malformed manifest", "key", hash) + err = fmt.Errorf("Manifest %v is malformed: %v", addr.Log(), err) + log.Trace("malformed manifest", "addr", addr) return } - log.Trace("manifest entries", "key", hash, "len", len(man.Entries)) + log.Trace("manifest entries", "addr", addr, "len", len(man.Entries)) trie = &manifestTrie{ fileStore: fileStore, @@ -406,12 +418,12 @@ func (mt *manifestTrie) recalcAndStore() error { sr := bytes.NewReader(manifest) ctx := context.TODO() - key, wait, err2 := mt.fileStore.Store(ctx, sr, int64(len(manifest)), mt.encrypted) + addr, wait, err2 := mt.fileStore.Store(ctx, sr, int64(len(manifest)), mt.encrypted) if err2 != nil { return err2 } err2 = wait(ctx) - mt.ref = key + mt.ref = addr return err2 } diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go index 6efeb78d9..87d918550 100644 --- a/swarm/fuse/swarmfs_test.go +++ b/swarm/fuse/swarmfs_test.go @@ -20,7 +20,6 @@ package fuse import ( "bytes" - "context" "crypto/rand" "flag" "fmt" @@ -111,7 +110,7 @@ func createTestFilesAndUploadToSwarm(t *testing.T, api *api.API, files map[strin } //upload directory to swarm and return hash - bzzhash, err := api.Upload(context.TODO(), uploadDir, "", toEncrypt) + bzzhash, err := Upload(uploadDir, "", api, toEncrypt) if err != nil { t.Fatalf("Error uploading directory %v: %vm encryption: %v", uploadDir, err, toEncrypt) } @@ -1695,3 +1694,9 @@ func TestFUSE(t *testing.T) { t.Run("appendFileContentsToEndNonEncrypted", ta.appendFileContentsToEndNonEncrypted) } } + +func Upload(uploadDir, index string, a *api.API, toEncrypt bool) (hash string, err error) { + fs := api.NewFileSystem(a) + hash, err = fs.Upload(uploadDir, index, toEncrypt) + return hash, err +} diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go new file mode 100644 index 000000000..35e2f0132 --- /dev/null +++ b/swarm/network/fetcher.go @@ -0,0 +1,305 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +var searchTimeout = 1 * time.Second + +// Time to consider peer to be skipped. +// Also used in stream delivery. +var RequestTimeout = 10 * time.Second + +type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error) + +// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and +// keeps it alive until all active requests are completed. This can happen: +// 1. either because the chunk is delivered +// 2. or becuse the requestor cancelled/timed out +// Fetcher self destroys itself after it is completed. +// TODO: cancel all forward requests after termination +type Fetcher struct { + protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk + addr storage.Address // the address of the chunk to be fetched + offerC chan *discover.NodeID // channel of sources (peer node id strings) + requestC chan struct{} + skipCheck bool +} + +type Request struct { + Addr storage.Address // chunk address + Source *discover.NodeID // nodeID of peer to request from (can be nil) + SkipCheck bool // whether to offer the chunk first or deliver directly + peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil) +} + +// NewRequest returns a new instance of Request based on chunk address skip check and +// a map of peers to skip. +func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request { + return &Request{ + Addr: addr, + SkipCheck: skipCheck, + peersToSkip: peersToSkip, + } +} + +// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk. +// Peers to skip are kept per Request and for a time period of RequestTimeout. +// This function is used in stream package in Delivery.RequestFromPeers to optimize +// requests for chunks. +func (r *Request) SkipPeer(nodeID string) bool { + val, ok := r.peersToSkip.Load(nodeID) + if !ok { + return false + } + t, ok := val.(time.Time) + if ok && time.Now().After(t.Add(RequestTimeout)) { + // deadine expired + r.peersToSkip.Delete(nodeID) + return false + } + return true +} + +// FetcherFactory is initialised with a request function and can create fetchers +type FetcherFactory struct { + request RequestFunc + skipCheck bool +} + +// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory +func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { + return &FetcherFactory{ + request: request, + skipCheck: skipCheck, + } +} + +// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to +// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting +// this chunk, to make sure we don't request back the chunks from them. +// The created Fetcher is started and returned. +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(source, f.request, f.skipCheck) + go fetcher.run(ctx, peersToSkip) + return fetcher +} + +// NewFetcher creates a new Fetcher for the given chunk address using the given request function. +func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { + return &Fetcher{ + addr: addr, + protoRequestFunc: rf, + offerC: make(chan *discover.NodeID), + requestC: make(chan struct{}), + skipCheck: skipCheck, + } +} + +// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. +func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.offerC <- source: + case <-ctx.Done(): + } +} + +// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. +func (f *Fetcher) Request(ctx context.Context) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.requestC <- struct{}{}: + case <-ctx.Done(): + } +} + +// start prepares the Fetcher +// it keeps the Fetcher alive within the lifecycle of the passed context +func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { + var ( + doRequest bool // determines if retrieval is initiated in the current iteration + wait *time.Timer // timer for search timeout + waitC <-chan time.Time // timer channel + sources []*discover.NodeID // known sources, ie. peers that offered the chunk + requested bool // true if the chunk was actually requested + ) + gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected + + // loop that keeps the fetching process alive + // after every request a timer is set. If this goes off we request again from another peer + // note that the previous request is still alive and has the chance to deliver, so + // rerequesting extends the search. ie., + // if a peer we requested from is gone we issue a new request, so the number of active + // requests never decreases + for { + select { + + // incoming offer + case source := <-f.offerC: + log.Trace("new source", "peer addr", source, "request addr", f.addr) + // 1) the chunk is offered by a syncing peer + // add to known sources + sources = append(sources, source) + // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer) + doRequest = requested + + // incoming request + case <-f.requestC: + log.Trace("new request", "request addr", f.addr) + // 2) chunk is requested, set requested flag + // launch a request iff none been launched yet + doRequest = !requested + requested = true + + // peer we requested from is gone. fall back to another + // and remove the peer from the peers map + case id := <-gone: + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr) + peers.Delete(id.String()) + doRequest = requested + + // search timeout: too much time passed since the last request, + // extend the search to a new peer if we can find one + case <-waitC: + log.Trace("search timed out: rerequesting", "request addr", f.addr) + doRequest = requested + + // all Fetcher context closed, can quit + case <-ctx.Done(): + log.Trace("terminate fetcher", "request addr", f.addr) + // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from) + return + } + + // need to issue a new request + if doRequest { + var err error + sources, err = f.doRequest(ctx, gone, peers, sources) + if err != nil { + log.Warn("unable to request", "request addr", f.addr, "err", err) + } + } + + // if wait channel is not set, set it to a timer + if requested { + if wait == nil { + wait = time.NewTimer(searchTimeout) + defer wait.Stop() + waitC = wait.C + } else { + // stop the timer and drain the channel if it was not drained earlier + if !wait.Stop() { + select { + case <-wait.C: + default: + } + } + // reset the timer to go off after searchTimeout + wait.Reset(searchTimeout) + } + } + doRequest = false + } +} + +// doRequest attempts at finding a peer to request the chunk from +// * first it tries to request explicitly from peers that are known to have offered the chunk +// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address +// excluding those in the peersToSkip map +// * if no such peer is found an error is returned +// +// if a request is successful, +// * the peer's address is added to the set of peers to skip +// * the peer's address is removed from prospective sources, and +// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) +func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) { + var i int + var sourceID *discover.NodeID + var quit chan struct{} + + req := &Request{ + Addr: f.addr, + SkipCheck: f.skipCheck, + peersToSkip: peersToSkip, + } + + foundSource := false + // iterate over known sources + for i = 0; i < len(sources); i++ { + req.Source = sources[i] + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err == nil { + // remove the peer from known sources + // Note: we can modify the source although we are looping on it, because we break from the loop immediately + sources = append(sources[:i], sources[i+1:]...) + foundSource = true + break + } + } + + // if there are no known sources, or none available, we try request from a closest node + if !foundSource { + req.Source = nil + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err != nil { + // if no peers found to request from + return sources, err + } + } + // add peer to the set of peers to skip from now + peersToSkip.Store(sourceID.String(), time.Now()) + + // if the quit channel is closed, it indicates that the source peer we requested from + // disconnected or terminated its streamer + // here start a go routine that watches this channel and reports the source peer on the gone channel + // this go routine quits if the fetcher global context is done to prevent process leak + go func() { + select { + case <-quit: + gone <- sourceID + case <-ctx.Done(): + } + }() + return sources, nil +} diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go new file mode 100644 index 000000000..21b81d652 --- /dev/null +++ b/swarm/network/fetcher_test.go @@ -0,0 +1,459 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +var requestedPeerID = discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") +var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + +// mockRequester pushes every request to the requestC channel when its doRequest function is called +type mockRequester struct { + // requests []Request + requestC chan *Request // when a request is coming it is pushed to requestC + waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional) + ctr int //counts the number of requests + quitC chan struct{} +} + +func newMockRequester(waitTimes ...time.Duration) *mockRequester { + return &mockRequester{ + requestC: make(chan *Request), + waitTimes: waitTimes, + quitC: make(chan struct{}), + } +} + +func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*discover.NodeID, chan struct{}, error) { + waitTime := time.Duration(0) + if m.ctr < len(m.waitTimes) { + waitTime = m.waitTimes[m.ctr] + m.ctr++ + } + time.Sleep(waitTime) + m.requestC <- request + + // if there is a Source in the request use that, if not use the global requestedPeerId + source := request.Source + if source == nil { + source = &requestedPeerID + } + return source, m.quitC, nil +} + +// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip. +// mockRequester pushes a Request on a channel every time the request function is called. Using +// this channel we test if calling Fetcher.Request calls the request function, and whether it uses +// the correct peers to skip which we provided for the fetcher.run function. +func TestFetcherSingleRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peers := []string{"a", "b", "c", "d"} + peersToSkip := &sync.Map{} + for _, p := range peers { + peersToSkip.Store(p, time.Now()) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + fetcher.Request(rctx) + + select { + case request := <-requester.requestC: + // request should contain all peers from peersToSkip provided to the fetcher + for _, p := range peers { + if _, ok := request.peersToSkip.Load(p); !ok { + t.Fatalf("request.peersToSkip misses peer") + } + } + + // source peer should be also added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok { + t.Fatalf("request.peersToSkip does not contain peer returned by the request function") + } + + // fetch should trigger a request, if it doesn't happen in time, test should fail + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } +} + +// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called +func TestFetcherCancelStopsFetcher(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + + // we start the fetcher, and then we immediately cancel the context + go fetcher.run(ctx, peersToSkip) + cancel() + + rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer rcancel() + // we call Request with an active context + fetcher.Request(rctx) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } +} + +// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request +func TestFetcherCancelStopsRequest(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // we start the fetcher with an active context + go fetcher.run(ctx, peersToSkip) + + rctx, rcancel := context.WithCancel(context.Background()) + rcancel() + + // we call Request with a cancelled context + fetcher.Request(rctx) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetch function initiated request") + case <-time.After(200 * time.Millisecond): + } + + // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled + rctx = context.Background() + fetcher.Request(rctx) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("expected request") + } +} + +// TestOfferUsesSource tests Fetcher Offer behavior. +// In this case there should be 1 (and only one) request initiated from the source peer, and the +// source nodeid should appear in the peersToSkip map. +func TestFetcherOfferUsesSource(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + // call the Offer function with the source peer + fetcher.Offer(rctx, &sourcePeerID) + + // fetcher should not initiate request + select { + case <-requester.requestC: + t.Fatalf("fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } + + // call Request after the Offer + rctx = context.Background() + fetcher.Request(rctx) + + // there should be exactly 1 request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + select { + case <-requester.requestC: + t.Fatalf("Fetcher number of requests expected 1 got 2") + case <-time.After(200 * time.Millisecond): + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + // call Request first + rctx := context.Background() + fetcher.Request(rctx) + + // there should be a request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if request.Source != nil { + t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // after the Request call Offer + fetcher.Offer(context.Background(), &sourcePeerID) + + // there should be a request coming from fetcher + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed +func TestFetcherRetryOnTimeout(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // set searchTimeOut to low value so the test is quicker + defer func(t time.Duration) { + searchTimeout = t + }(searchTimeout) + searchTimeout = 250 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + // call the fetch function with an active context + rctx := context.Background() + fetcher.Request(rctx) + + // after 100ms the first request should be initiated + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not initiate request") + } + + // after another 100ms no new request should be initiated, because search timeout is 250ms + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + t.Fatalf("unexpected request from fetcher") + default: + } + + // after another 300ms search timeout is over, there should be a new request + time.Sleep(300 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not retry request") + } +} + +// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts +// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if +// a request is initiated when the fetch function is called +func TestFetcherFactory(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcherFactory := NewFetcherFactory(requester.doRequest, false) + + peersToSkip := &sync.Map{} + + fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip) + + fetcher.Request(context.Background()) + + // check if the created fetchFunction really starts a fetcher and initiates a request + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } + +} + +func TestFetcherRequestQuitRetriesRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + // make sure searchTimeout is long so it is sure the request is not retried because of timeout + defer func(t time.Duration) { + searchTimeout = t + }(searchTimeout) + searchTimeout = 10 * time.Second + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + fetcher.Request(rctx) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated") + } + + close(requester.quitC) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated after failed request") + } +} + +// TestRequestSkipPeer checks if PeerSkip function will skip provided peer +// and not skip unknown one. +func TestRequestSkipPeer(t *testing.T) { + addr := make([]byte, 32) + peers := []discover.NodeID{ + discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + } + + peersToSkip := new(sync.Map) + peersToSkip.Store(peers[0].String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peers[0].String()) { + t.Errorf("peer not skipped") + } + + if r.SkipPeer(peers[1].String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerExpired checks if a peer to skip is not skipped +// after RequestTimeout has passed. +func TestRequestSkipPeerExpired(t *testing.T) { + addr := make([]byte, 32) + peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if r.SkipPeer(peer.String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped +// after RequestTimeout is not skipped if it is set for a permanent skipping +// by value to peersToSkip map is not time.Duration. +func TestRequestSkipPeerPermanent(t *testing.T) { + addr := make([]byte, 32) + peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), true) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } +} diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go index fab638c9e..538502605 100644 --- a/swarm/network/priorityqueue/priorityqueue.go +++ b/swarm/network/priorityqueue/priorityqueue.go @@ -28,10 +28,13 @@ package priorityqueue import ( "context" "errors" + + "github.com/ethereum/go-ethereum/log" ) var ( - errContention = errors.New("queue contention") + ErrContention = errors.New("contention") + errBadPriority = errors.New("bad priority") wakey = struct{}{} @@ -39,7 +42,7 @@ var ( // PriorityQueue is the basic structure type PriorityQueue struct { - queues []chan interface{} + Queues []chan interface{} wakeup chan struct{} } @@ -50,27 +53,29 @@ func New(n int, l int) *PriorityQueue { queues[i] = make(chan interface{}, l) } return &PriorityQueue{ - queues: queues, + Queues: queues, wakeup: make(chan struct{}, 1), } } // Run is a forever loop popping items from the queues func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) { - top := len(pq.queues) - 1 + top := len(pq.Queues) - 1 p := top READ: for { - q := pq.queues[p] + q := pq.Queues[p] select { case <-ctx.Done(): return case x := <-q: + log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p])) f(x) p = top default: if p > 0 { p-- + log.Trace("priority.queue p > 0", "p", p) continue READ } p = top @@ -78,6 +83,7 @@ READ: case <-ctx.Done(): return case <-pq.wakeup: + log.Trace("priority.queue wakeup", "p", p) } } } @@ -85,23 +91,15 @@ READ: // Push pushes an item to the appropriate queue specified in the priority argument // if context is given it waits until either the item is pushed or the Context aborts -// otherwise returns errContention if the queue is full -func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error { - if p < 0 || p >= len(pq.queues) { +func (pq *PriorityQueue) Push(x interface{}, p int) error { + if p < 0 || p >= len(pq.Queues) { return errBadPriority } - if ctx == nil { - select { - case pq.queues[p] <- x: - default: - return errContention - } - } else { - select { - case pq.queues[p] <- x: - case <-ctx.Done(): - return ctx.Err() - } + log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p])) + select { + case pq.Queues[p] <- x: + default: + return ErrContention } select { case pq.wakeup <- wakey: diff --git a/swarm/network/priorityqueue/priorityqueue_test.go b/swarm/network/priorityqueue/priorityqueue_test.go index cd54250f8..ed8b575c2 100644 --- a/swarm/network/priorityqueue/priorityqueue_test.go +++ b/swarm/network/priorityqueue/priorityqueue_test.go @@ -30,7 +30,7 @@ func TestPriorityQueue(t *testing.T) { results = append(results, v.(string)) wg.Done() }) - pq.Push(context.Background(), "2.0", 2) + pq.Push("2.0", 2) wg.Wait() if results[0] != "2.0" { t.Errorf("expected first result %q, got %q", "2.0", results[0]) @@ -66,7 +66,7 @@ Loop: { priorities: []int{0, 0, 0}, values: []string{"0.0", "0.0", "0.1"}, - errors: []error{nil, nil, errContention}, + errors: []error{nil, nil, ErrContention}, }, } { var results []string @@ -74,7 +74,7 @@ Loop: pq := New(3, 2) wg.Add(len(tc.values)) for j, value := range tc.values { - err := pq.Push(nil, value, tc.priorities[j]) + err := pq.Push(value, tc.priorities[j]) if tc.errors != nil && err != tc.errors[j] { t.Errorf("expected push error %v, got %v", tc.errors[j], err) continue Loop diff --git a/swarm/network/simulation/simulation.go b/swarm/network/simulation/simulation.go index 74f9d98ee..096f7322c 100644 --- a/swarm/network/simulation/simulation.go +++ b/swarm/network/simulation/simulation.go @@ -94,7 +94,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) { } s.Net = simulations.NewNetwork( - adapters.NewSimAdapter(adapterServices), + adapters.NewTCPAdapter(adapterServices), &simulations.NetworkConfig{ID: "0"}, ) @@ -164,17 +164,6 @@ var maxParallelCleanups = 10 func (s *Simulation) Close() { close(s.done) - // Close all connections before calling the Network Shutdown. - // It is possible that p2p.Server.Stop will block if there are - // existing connections. - for _, c := range s.Net.Conns { - if c.Up { - s.Net.Disconnect(c.One, c.Other) - } - } - s.shutdownWG.Wait() - s.Net.Shutdown() - sem := make(chan struct{}, maxParallelCleanups) s.mu.RLock() cleanupFuncs := make([]func(), len(s.cleanupFuncs)) @@ -206,6 +195,9 @@ func (s *Simulation) Close() { } close(s.runC) } + + s.shutdownWG.Wait() + s.Net.Shutdown() } // Done returns a channel that is closed when the simulation diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 491dc9fd5..e0d776e34 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora return nil, nil, nil, removeDataDir, err } - db := storage.NewDBAPI(localStore) - delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, nil, removeDataDir, err + } + + delivery := NewDelivery(to, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) teardown := func() { streamer.Close() removeDataDir() @@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } -func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) { +func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } -func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) { +func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - rrs.stores[idx].Put(ctx, chunk) + return rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 627352535..d0f27eebc 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -19,12 +19,11 @@ package stream import ( "context" "errors" - "time" - "github.com/ethereum/go-ethereum/common" + "fmt" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/discover" - cp "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -46,39 +45,34 @@ var ( ) type Delivery struct { - db *storage.DBAPI - kad *network.Kademlia - receiveC chan *ChunkDeliveryMsg - getPeer func(discover.NodeID) *Peer + chunkStore storage.SyncChunkStore + kad *network.Kademlia + getPeer func(discover.NodeID) *Peer } -func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery { - d := &Delivery{ - db: db, - kad: kad, - receiveC: make(chan *ChunkDeliveryMsg, deliveryCap), +func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { + return &Delivery{ + chunkStore: chunkStore, + kad: kad, } - - go d.processReceivedChunks() - return d } // SwarmChunkServer implements Server type SwarmChunkServer struct { deliveryC chan []byte batchC chan []byte - db *storage.DBAPI + chunkStore storage.ChunkStore currentLen uint64 quit chan struct{} } // NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer { +func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - db: db, - quit: make(chan struct{}), + deliveryC: make(chan []byte, deliveryCap), + batchC: make(chan []byte), + chunkStore: chunkStore, + quit: make(chan struct{}), } go s.processDeliveries() return s @@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() { // GetData retrives chunk data from db store func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) - if chunk.ReqC != nil { - if created { - if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { - log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) - chunk.SetErrored(storage.ErrChunkForward) - return nil - } + + var cancel func() + // TODO: do something with this hardcoded timeout, maybe use TTL in the future + ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) + + go func() { + select { + case <-ctx.Done(): + case <-streamer.quit: } - go func() { - var osp opentracing.Span - ctx, osp = spancontext.StartSpan( - ctx, - "waiting.delivery") - defer osp.Finish() - - t := time.NewTimer(10 * time.Minute) - defer t.Stop() - - log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created) - start := time.Now() - select { - case <-chunk.ReqC: - log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start)) - case <-t.C: - log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr) - chunk.SetErrored(storage.ErrChunkTimeout) - return - } - chunk.SetErrored(nil) - - if req.SkipCheck { - err := sp.Deliver(ctx, chunk, s.priority) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) - sp.Drop(err) - } + cancel() + }() + + go func() { + chunk, err := d.chunkStore.Get(ctx, req.Addr) + if err != nil { + log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) + return + } + if req.SkipCheck { + err = sp.Deliver(ctx, chunk, s.priority) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - streamer.deliveryC <- chunk.Addr[:] - }() - return nil - } - // TODO: call the retrieve function of the outgoing syncer - if req.SkipCheck { - log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr) - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) + return } - return sp.Deliver(ctx, chunk, s.priority) - } - streamer.deliveryC <- chunk.Addr[:] + select { + case streamer.deliveryC <- chunk.Address()[:]: + case <-streamer.quit: + } + + }() + return nil } @@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } +// TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( @@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch "chunk.delivery") defer osp.Finish() - req.peer = sp - d.receiveC <- req - return nil -} + processReceivedChunksCount.Inc(1) -func (d *Delivery) processReceivedChunks() { -R: - for req := range d.receiveC { - processReceivedChunksCount.Inc(1) - - if len(req.SData) > cp.DefaultSize+8 { - log.Warn("received chunk is bigger than expected", "len", len(req.SData)) - continue R - } - - // this should be has locally - chunk, err := d.db.Get(context.TODO(), req.Addr) - if err == nil { - continue R - } - if err != storage.ErrFetching { - log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk) - continue R - } - select { - case <-chunk.ReqC: - log.Error("someone else delivered?", "hash", chunk.Addr.Hex()) - continue R - default: - } - - chunk.SData = req.SData - d.db.Put(context.TODO(), chunk) - - go func(req *ChunkDeliveryMsg) { - err := chunk.WaitToStore() + go func() { + req.peer = sp + err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + if err != nil { if err == storage.ErrChunkInvalid { + // we removed this log because it spams the logs + // TODO: Enable this log line + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) req.peer.Drop(err) } - }(req) - } + } + }() + return nil } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { - var success bool - var err error +func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { requestFromPeersCount.Inc(1) + var sp *Peer + spID := req.Source - d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool { - spId := p.ID() - for _, p := range peersToSkip { - if p == spId { - log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId) + if spID != nil { + sp = d.getPeer(*spID) + if sp == nil { + return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) + } + } else { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { + id := p.ID() + // TODO: skip light nodes that do not accept retrieve requests + if req.SkipPeer(id.String()) { + log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true } - } - sp := d.getPeer(spId) + sp = d.getPeer(id) + if sp == nil { + log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) + return true + } + spID = &id + return false + }) if sp == nil { - log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) - return true + return nil, nil, errors.New("no peer found") } - err = sp.SendPriority(ctx, &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: skipCheck, - }, Top) - if err != nil { - return true - } - requestFromPeersEachCount.Inc(1) - success = true - return false - }) - if success { - return nil } - return errors.New("no peer found") + + err := sp.SendPriority(ctx, &RetrieveRequestMsg{ + Addr: req.Addr, + SkipCheck: req.SkipCheck, + }, Top) + if err != nil { + return nil, nil, err + } + requestFromPeersEachCount.Inc(1) + + return spID, sp.quit, nil } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 972cc859a..ece54d4ee 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) + ctx := context.Background() + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + streamer.delivery.RequestFromPeers(ctx, req) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { Code: 5, Msg: &RetrieveRequestMsg{ - Addr: chunk.Addr[:], + Addr: chunk.Address()[:], }, Peer: peerID, }, @@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }) hash := storage.Address(hash0[:]) - chunk := storage.NewChunk(hash, nil) - chunk.SData = hash - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk := storage.NewChunk(hash, hash) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } hash = storage.Address(hash1[:]) - chunk = storage.NewChunk(hash, nil) - chunk.SData = hash1[:] - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk = storage.NewChunk(hash, hash1[:]) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) - - if !created { - t.Fatal("chunk already exists") - } - select { - case <-chunk.ReqC: - t.Fatal("chunk is already received") - default: - } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { }, }, p2ptest.Exchange{ - Label: "ChunkDeliveryRequest message", + Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ { Code: 6, @@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { if err != nil { t.Fatalf("Expected no error, got %v", err) } - - timeout := time.NewTimer(1 * time.Second) - - select { - case <-timeout.C: - t.Fatal("timeout receiving chunk") - case <-chunk.ReqC: + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // wait for the chunk to get stored + storedChunk, err := localStore.Get(ctx, chunkKey) + for err != nil { + select { + case <-ctx.Done(): + t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) + default: + } + storedChunk, err = localStore.Get(ctx, chunkKey) + time.Sleep(50 * time.Millisecond) } - storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } - if !bytes.Equal(storedChunk.SData, chunkData) { + if !bytes.Equal(storedChunk.Data(), chunkData) { t.Fatal("Retrieved chunk has different data than original") } @@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) @@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - id := ctx.Config.ID addr := network.NewAddrFromNodeID(id) store, datadir, err := createTestLocalStorageForID(id, addr) @@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, DoSync: true, SyncUpdateDelay: 0, }) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index f4294134b..452aaca76 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -38,13 +38,18 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) -func TestIntervals(t *testing.T) { +func TestIntervalsLive(t *testing.T) { testIntervals(t, true, nil, false) - testIntervals(t, false, NewRange(9, 26), false) - testIntervals(t, true, NewRange(9, 26), false) - testIntervals(t, true, nil, true) +} + +func TestIntervalsHistory(t *testing.T) { + testIntervals(t, false, NewRange(9, 26), false) testIntervals(t, false, NewRange(9, 26), true) +} + +func TestIntervalsLiveAndHistory(t *testing.T) { + testIntervals(t, true, NewRange(9, 26), false) testIntervals(t, true, NewRange(9, 26), true) } @@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { os.RemoveAll(datadir) } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { - return newTestExternalClient(db), nil + return newTestExternalClient(netStore), nil }) r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil @@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() storer := nodeIDs[0] @@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { liveErrC := make(chan error) historyErrC := make(chan error) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - log.Error("WaitKademlia error: %v", "err", err) - return err - } - log.Debug("Watching for disconnections") disconnections := sim.PeerEvents( context.Background(), @@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), ) + err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) + if err != nil { + return err + } + go func() { for d := range disconnections { if d.Error != nil { @@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var liveHashesChan chan []byte liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - log.Error("Subscription error: %v", "err", err) + log.Error("get hashes", "err", err) return } i := externalStreamSessionAt @@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var historyHashesChan chan []byte historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false)) if err != nil { + log.Error("get hashes", "err", err) return } @@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } }() - err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) - if err != nil { - return err - } if err := <-liveErrC; err != nil { return err } @@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error { type testExternalClient struct { hashes chan []byte - db *storage.DBAPI + store storage.SyncChunkStore enableNotificationsC chan struct{} } -func newTestExternalClient(db *storage.DBAPI) *testExternalClient { +func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - db: db, + store: store, enableNotificationsC: make(chan struct{}), } } -func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(ctx, hash) - if chunk.ReqC == nil { +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { + wait := c.store.FetchFunc(ctx, storage.Address(hash)) + if wait == nil { return nil } - c.hashes <- hash - //NOTE: This was failing on go1.9.x with a deadlock. - //Sometimes this function would just block - //It is commented now, but it may be well worth after the chunk refactor - //to re-enable this and see if the problem has been addressed - /* - return func() { - return chunk.WaitToStore() + select { + case c.hashes <- hash: + case <-ctx.Done(): + log.Warn("testExternalClient NeedData context", "err", ctx.Err()) + return func(_ context.Context) error { + return ctx.Err() } - */ - return nil + } + return wait } func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index a19f63589..2e1a81e82 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -18,9 +18,7 @@ package stream import ( "context" - "errors" "fmt" - "sync" "time" "github.com/ethereum/go-ethereum/metrics" @@ -31,6 +29,8 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) +var syncBatchTimeout = 30 * time.Second + // Stream defines a unique stream identifier. type Stream struct { // Name is used for Client and Server functions identification. @@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e go func() { if err := p.SendOfferedHashes(os, from, to); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() @@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e } go func() { if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() } @@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if err != nil { return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err) } - wg := sync.WaitGroup{} + + ctr := 0 + errC := make(chan error) + ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) + + ctx = context.WithValue(ctx, "source", p.ID().String()) for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] if wait := c.NeedData(ctx, hash); wait != nil { + ctr++ want.Set(i/HashSize, true) - wg.Add(1) // create request and wait until the chunk data arrives and is stored - go func(w func()) { - w() - wg.Done() + go func(w func(context.Context) error) { + select { + case errC <- w(ctx): + case <-ctx.Done(): + } }(wait) } } - // done := make(chan bool) - // go func() { - // wg.Wait() - // close(done) - // }() - // go func() { - // select { - // case <-done: - // s.next <- s.batchDone(p, req, hashes) - // case <-time.After(1 * time.Second): - // p.Drop(errors.New("timeout waiting for batch to be delivered")) - // } - // }() + go func() { - wg.Wait() + defer cancel() + for i := 0; i < ctr; i++ { + select { + case err := <-errC: + if err != nil { + log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + return + } + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + } + } select { case c.next <- c.batchDone(p, req, hashes): case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) } }() // only send wantedKeysMsg if all missing chunks of the previous batch arrived @@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg c.sessionAt = req.From } from, to := c.nextBatch(req.To + 1) - log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) + log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID()) if from == to { return nil } @@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg To: to, } go func() { + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) select { - case <-time.After(120 * time.Second): - log.Warn("handleOfferedHashesMsg timeout, so dropping peer") - p.Drop(errors.New("handle offered hashes timeout")) - return case err := <-c.next: if err != nil { - log.Warn("c.next dropping peer", "err", err) + log.Warn("c.next error dropping peer", "err", err) p.Drop(err) return } case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) err := p.SendPriority(ctx, msg, c.priority) if err != nil { - log.Warn("SendPriority err, so dropping peer", "err", err) - p.Drop(err) + log.Warn("SendPriority error", "err", err) } }() return nil @@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) // launch in go routine since GetBatch blocks until new hashes arrive go func() { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "err", err) } }() // go p.SendOfferedHashes(s, req.From, req.To) @@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } - chunk := storage.NewChunk(hash, nil) - chunk.SData = data - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) - } + chunk := storage.NewChunk(hash, data) if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 80b9ab711..1466a7a9c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -33,8 +33,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) -var sendTimeout = 30 * time.Second - type notFoundError struct { t string s Stream @@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - p.Send(wmsg.Context, wmsg.Msg) + err := p.Send(wmsg.Context, wmsg.Msg) + if err != nil { + log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + } }) + + // basic monitoring for pq contention + go func(pq *pq.PriorityQueue) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + var len_maxi int + var cap_maxi int + for k := range pq.Queues { + if len_maxi < len(pq.Queues[k]) { + len_maxi = len(pq.Queues[k]) + } + + if cap_maxi < cap(pq.Queues[k]) { + cap_maxi = cap(pq.Queues[k]) + } + } + + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi)) + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi)) + case <-p.quit: + return + } + } + }(p.pq) + go func() { <-p.quit cancel() @@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error { var sp opentracing.Span ctx, sp = spancontext.StartSpan( ctx, @@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 defer sp.Finish() msg := &ChunkDeliveryMsg{ - Addr: chunk.Addr, - SData: chunk.SData, + Addr: chunk.Address(), + SData: chunk.Data(), } return p.SendPriority(ctx, msg, priority) } @@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) - defer cancel() wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, } - return p.pq.Push(cctx, wmsg, int(priority)) + err := p.pq.Push(wmsg, int(priority)) + if err == pq.ErrContention { + log.Warn("dropping peer on priority queue contention", "peer", p.ID()) + p.Drop(err) + } + return err } // SendOfferedHashes sends OfferedHashesMsg protocol msg diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 4ff947b21..19eaad34e 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 0, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucketKeyFileStore = simulation.BucketKey("filestore") bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 313019d6a..7cd09099c 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "runtime" "sync" "testing" "time" @@ -39,15 +40,20 @@ import ( mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) -const testMinProxBinSize = 2 const MaxTimeout = 600 type synctestConfig struct { - addrs [][]byte - hashes []storage.Address - idToChunksMap map[discover.NodeID][]int - chunksToNodesMap map[string][]int - addrToIDMap map[string]discover.NodeID + addrs [][]byte + hashes []storage.Address + idToChunksMap map[discover.NodeID][]int + //chunksToNodesMap map[string][]int + addrToIDMap map[string]discover.NodeID +} + +// Tests in this file should not request chunks from peers. +// This function will panic indicating that there is a problem if request has been made. +func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { + panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String())) } //This test is a syncing test for nodes. @@ -58,6 +64,9 @@ type synctestConfig struct { //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { @@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) - err := testSyncingViaDirectSubscribe(*chunks, *nodes) + err := testSyncingViaDirectSubscribe(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { for _, chnk := range chnkCnt { for _, n := range nodeCnt { log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) - err := testSyncingViaDirectSubscribe(chnk, n) + err := testSyncingViaDirectSubscribe(t, chnk, n) if err != nil { t.Fatal(err) } @@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { t.Fatal(err) } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. allSuccess := false @@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { }() } for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if result.Error != nil { t.Fatal(result.Error) } + log.Info("Simulation ended") } /* @@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. */ -func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { +func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { @@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, }) defer sim.Close() - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() conf := &synctestConfig{} @@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return err } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { // or until the timeout is reached. allSuccess := false for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return result.Error } - log.Info("Simulation terminated") + log.Info("Simulation ended") return nil } @@ -462,10 +527,9 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //iterate over each bin and solicit needed subscription to bins kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to - histRange := &Range{} subCnt++ - err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top) + err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High) if err != nil { log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err)) return false @@ -478,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //map chunk keys to addresses which are responsible func mapKeysToNodes(conf *synctestConfig) { - kmap := make(map[string][]int) nodemap := make(map[string][]int) //build a pot for chunk hashes np := pot.NewPot(nil, 0) @@ -487,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) { indexmap[string(a)] = i np, _, _ = pot.Add(np, a, pof) } + + var kadMinProxSize = 2 + + ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs) + //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes)) for i := 0; i < len(conf.hashes); i++ { - pl := 256 //highest possible proximity - var nns []int + var a []byte np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool { - a := val.([]byte) - if pl < 256 && pl != po { - return false - } - if pl == 256 || pl == po { - log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)])) - nns = append(nns, indexmap[string(a)]) - nodemap[string(a)] = append(nodemap[string(a)], i) - } - if pl == 256 && len(nns) >= testMinProxBinSize { - //maxProxBinSize has been reached at this po, so save it - //we will add all other nodes at the same po - pl = po - } - return true + // take the first address + a = val.([]byte) + return false }) - kmap[string(conf.hashes[i])] = nns + + nns := ppmap[common.Bytes2Hex(a)].NNSet + nns = append(nns, a) + + for _, p := range nns { + nodemap[string(p)] = append(nodemap[string(p)], i) + } } for addr, chunks := range nodemap { //this selects which chunks are expected to be found with the given node conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks } log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap)) - conf.chunksToNodesMap = kmap } //upload a file(chunks) to a single local node store diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index deffdfc3f..1f1f34b7b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,10 +32,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -43,8 +41,8 @@ const ( Mid High Top - PriorityQueue // number of queues - PriorityQueueCap = 32 // queue capacity + PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top + PriorityQueueCap = 128 // queue capacity HashSize = 32 ) @@ -73,7 +71,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } @@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) { - return NewSwarmChunkServer(delivery.db), nil + return NewSwarmChunkServer(delivery.chunkStore), nil }) streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live)) + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) - RegisterSwarmSyncerServer(streamer, db) - RegisterSwarmSyncerClient(streamer, db) + RegisterSwarmSyncerServer(streamer, syncChunkStore) + RegisterSwarmSyncerClient(streamer, syncChunkStore) if options.DoSync { // latestIntC function ensures that @@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "registry.retrieve") - defer sp.Finish() - - return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) -} - func (r *Registry) NodeInfo() interface{} { return nil } @@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData(context.Context, []byte) func() + NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 7523860c9..06e96b9a9 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -80,15 +80,17 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(ctx context.Context, hash []byte) func() { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { - return func() { + return func(context.Context) error { <-self.wait0 + return nil } } else if bytes.Equal(hash, hash2[:]) { - return func() { + return func(context.Context) error { <-self.wait2 + return nil } } return nil diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index d7febe4a3..e9811a678 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -28,7 +28,6 @@ import ( ) const ( - // BatchSize = 2 BatchSize = 128 ) @@ -38,35 +37,37 @@ const ( // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 - db *storage.DBAPI + store storage.SyncChunkStore sessionAt uint64 start uint64 + live bool quit chan struct{} } // NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { - sessionAt := db.CurrentBucketStorageIndex(po) +func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { + sessionAt := syncChunkStore.BinIndex(po) var start uint64 if live { start = sessionAt } return &SwarmSyncerServer{ po: po, - db: db, + store: syncChunkStore, sessionAt: sessionAt, start: start, + live: live, quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, db) + return NewSwarmSyncerServer(live, po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() { close(s.quit) } -// GetSection retrieves the actual chunk from localstore +// GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.store.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 + if s.live { + if from == 0 { + from = s.start + } + if to <= from || from >= s.sessionAt { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionAt { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionAt { + to = s.sessionAt + } } + var ticker *time.Ticker defer func() { if ticker != nil { @@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 } metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { - batch = append(batch, addr[:]...) + err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { + batch = append(batch, key[:]...) i++ to = idx return i < BatchSize @@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 wait = true } - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) + log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) return batch, from, to, nil, nil } @@ -146,28 +155,26 @@ type SwarmSyncerClient struct { sessionReader storage.LazySectionReader retrieveC chan *storage.Chunk storeC chan *storage.Chunk - db *storage.DBAPI + store storage.SyncChunkStore // chunker storage.Chunker - currentRoot storage.Address - requestFunc func(chunk *storage.Chunk) - end, start uint64 - peer *Peer - ignoreExistingRequest bool - stream Stream + currentRoot storage.Address + requestFunc func(chunk *storage.Chunk) + end, start uint64 + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - db: db, - peer: p, - ignoreExistingRequest: ignoreExistingRequest, - stream: stream, + store: store, + peer: p, + stream: stream, }, nil } // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { +// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { // retrieveC := make(storage.Chunk, chunksCap) // RunChunkRequestor(p, retrieveC) // storeC := make(storage.Chunk, chunksCap) @@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) } // NeedData -func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(ctx, key) - // TODO: we may want to request from this peer anyway even if the request exists - - // ignoreExistingRequest is temporary commented out until its functionality is verified. - // For now, this optimization can be disabled. - if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { - return nil - } - // create request and wait until the chunk data arrives and is stored - return func() { - chunk.WaitToStore() - } +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { + return s.store.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f72aa3444..469d520f8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) - bucket.Store(bucketKeyDB, db) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyDB, netStore) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + bucket.Store(bucketKeyDelivery, delivery) - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) return r, cleanup, nil @@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { + netStore := item.(*storage.NetStore) + netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { hashes[i] = append(hashes[i], addr) totalHashes++ hashCounts[i]++ @@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - chunk, err := db.Get(ctx, key) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { - continue + db := item.(*storage.NetStore) + _, err := db.Get(ctx, key) + if err == nil { + found++ } - // needed for leveldb not to be closed? - // chunk.WaitToStore() - found++ } } log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) diff --git a/swarm/network_test.go b/swarm/network_test.go index 9bc6143d8..86a904339 100644 --- a/swarm/network_test.go +++ b/swarm/network_test.go @@ -87,10 +87,10 @@ func TestSwarmNetwork(t *testing.T) { }, }, { - name: "100_nodes", + name: "50_nodes", steps: []testSwarmNetworkStep{ { - nodeCount: 100, + nodeCount: 50, }, }, options: &testSwarmNetworkOptions{ @@ -99,10 +99,10 @@ func TestSwarmNetwork(t *testing.T) { disabled: !*longrunning, }, { - name: "100_nodes_skip_check", + name: "50_nodes_skip_check", steps: []testSwarmNetworkStep{ { - nodeCount: 100, + nodeCount: 50, }, }, options: &testSwarmNetworkOptions{ @@ -287,6 +287,7 @@ func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwa config.Init(privkey) config.DeliverySkipCheck = o.SkipCheck + config.Port = "" swarm, err := NewSwarm(config, nil) if err != nil { diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 6d805b8e2..40292e88f 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -22,10 +22,9 @@ import ( "fmt" "io" "sync" - "time" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" @@ -67,7 +66,6 @@ The hashing itself does use extra copies and allocation though, since it does ne var ( errAppendOppNotSuported = errors.New("Append operation not supported") - errOperationTimedOut = errors.New("operation timed out") ) type ChunkerParams struct { @@ -133,7 +131,7 @@ type TreeChunker struct { func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader { jp := &JoinerParams{ ChunkerParams: ChunkerParams{ - chunkSize: chunk.DefaultSize, + chunkSize: ch.DefaultSize, hashSize: int64(len(addr)), }, addr: addr, @@ -153,7 +151,7 @@ func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) ( tsp := &TreeSplitterParams{ SplitterParams: SplitterParams{ ChunkerParams: ChunkerParams{ - chunkSize: chunk.DefaultSize, + chunkSize: ch.DefaultSize, hashSize: putter.RefSize(), }, reader: data, @@ -201,11 +199,6 @@ func NewTreeSplitter(params *TreeSplitterParams) *TreeChunker { return tc } -// String() for pretty printing -func (c *Chunk) String() string { - return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", c.Addr.Log(), c.Size, len(c.SData)) -} - type hashJob struct { key Address chunk []byte @@ -236,7 +229,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. panic("chunker must be initialised") } - tc.runWorker() + tc.runWorker(ctx) depth := 0 treeSize := tc.chunkSize @@ -251,7 +244,7 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. // this waitgroup member is released after the root hash is calculated tc.wg.Add(1) //launch actual recursive function passing the waitgroups - go tc.split(depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) + go tc.split(ctx, depth, treeSize/tc.branches, key, tc.dataSize, tc.wg) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -267,14 +260,14 @@ func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context. if err != nil { return nil, nil, err } - case <-time.NewTimer(splitTimeout).C: - return nil, nil, errOperationTimedOut + case <-ctx.Done(): + return nil, nil, ctx.Err() } return key, tc.putter.Wait, nil } -func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { +func (tc *TreeChunker) split(ctx context.Context, depth int, treeSize int64, addr Address, size int64, parentWg *sync.WaitGroup) { // @@ -321,10 +314,10 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 secSize = treeSize } // the hash of that data - subTreeKey := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] + subTreeAddress := chunk[8+i*tc.hashSize : 8+(i+1)*tc.hashSize] childrenWg.Add(1) - tc.split(depth-1, treeSize/tc.branches, subTreeKey, secSize, childrenWg) + tc.split(ctx, depth-1, treeSize/tc.branches, subTreeAddress, secSize, childrenWg) i++ pos += treeSize @@ -336,7 +329,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 worker := tc.getWorkerCount() if int64(len(tc.jobC)) > worker && worker < ChunkProcessors { - tc.runWorker() + tc.runWorker(ctx) } select { @@ -345,7 +338,7 @@ func (tc *TreeChunker) split(depth int, treeSize int64, addr Address, size int64 } } -func (tc *TreeChunker) runWorker() { +func (tc *TreeChunker) runWorker(ctx context.Context) { tc.incrementWorkerCount() go func() { defer tc.decrementWorkerCount() @@ -357,7 +350,7 @@ func (tc *TreeChunker) runWorker() { return } - h, err := tc.putter.Put(tc.ctx, job.chunk) + h, err := tc.putter.Put(ctx, job.chunk) if err != nil { tc.errC <- err return @@ -377,8 +370,8 @@ func (tc *TreeChunker) Append() (Address, func(), error) { // LazyChunkReader implements LazySectionReader type LazyChunkReader struct { - Ctx context.Context - key Address // root key + ctx context.Context + addr Address // root address chunkData ChunkData off int64 // offset chunkSize int64 // inherit from chunker @@ -390,18 +383,18 @@ type LazyChunkReader struct { func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader { return &LazyChunkReader{ - key: tc.addr, + addr: tc.addr, chunkSize: tc.chunkSize, branches: tc.branches, hashSize: tc.hashSize, depth: tc.depth, getter: tc.getter, - Ctx: tc.ctx, + ctx: tc.ctx, } } func (r *LazyChunkReader) Context() context.Context { - return r.Ctx + return r.ctx } // Size is meant to be called on the LazySectionReader @@ -415,23 +408,24 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e "lcr.size") defer sp.Finish() - log.Debug("lazychunkreader.size", "key", r.key) + log.Debug("lazychunkreader.size", "addr", r.addr) if r.chunkData == nil { - chunkData, err := r.getter.Get(cctx, Reference(r.key)) + chunkData, err := r.getter.Get(cctx, Reference(r.addr)) if err != nil { return 0, err } - if chunkData == nil { - select { - case <-quitC: - return 0, errors.New("aborted") - default: - return 0, fmt.Errorf("root chunk not found for %v", r.key.Hex()) - } - } r.chunkData = chunkData + s := r.chunkData.Size() + log.Debug("lazychunkreader.size", "key", r.addr, "size", s) + if s < 0 { + return 0, errors.New("corrupt size") + } + return int64(s), nil } - return r.chunkData.Size(), nil + s := r.chunkData.Size() + log.Debug("lazychunkreader.size", "key", r.addr, "size", s) + + return int64(s), nil } // read at can be called numerous times @@ -443,7 +437,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { var sp opentracing.Span var cctx context.Context cctx, sp = spancontext.StartSpan( - r.Ctx, + r.ctx, "lcr.read") defer sp.Finish() @@ -460,7 +454,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { quitC := make(chan bool) size, err := r.Size(cctx, quitC) if err != nil { - log.Error("lazychunkreader.readat.size", "size", size, "err", err) + log.Debug("lazychunkreader.readat.size", "size", size, "err", err) return 0, err } @@ -481,7 +475,7 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { length *= r.chunkSize } wg.Add(1) - go r.join(cctx, b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) + go r.join(b, off, off+length, depth, treeSize/r.branches, r.chunkData, &wg, errC, quitC) go func() { wg.Wait() close(errC) @@ -489,20 +483,22 @@ func (r *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { err = <-errC if err != nil { - log.Error("lazychunkreader.readat.errc", "err", err) + log.Debug("lazychunkreader.readat.errc", "err", err) close(quitC) return 0, err } if off+int64(len(b)) >= size { + log.Debug("lazychunkreader.readat.return at end", "size", size, "off", off) return int(size - off), io.EOF } + log.Debug("lazychunkreader.readat.errc", "buff", len(b)) return len(b), nil } -func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { +func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunkData ChunkData, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // find appropriate block level - for chunkData.Size() < treeSize && depth > r.depth { + for chunkData.Size() < uint64(treeSize) && depth > r.depth { treeSize /= r.branches depth-- } @@ -545,19 +541,19 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in } wg.Add(1) go func(j int64) { - childKey := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] - chunkData, err := r.getter.Get(ctx, Reference(childKey)) + childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize] + chunkData, err := r.getter.Get(r.ctx, Reference(childAddress)) if err != nil { - log.Error("lazychunkreader.join", "key", fmt.Sprintf("%x", childKey), "err", err) + log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err) select { - case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childKey)): + case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)): case <-quitC: } return } if l := len(chunkData); l < 9 { select { - case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childKey), l): + case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l): case <-quitC: } return @@ -565,26 +561,26 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in if soff < off { soff = off } - r.join(ctx, b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) + r.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/r.branches, chunkData, wg, errC, quitC) }(i) } //for } // Read keeps a cursor so cannot be called simulateously, see ReadAt func (r *LazyChunkReader) Read(b []byte) (read int, err error) { - log.Debug("lazychunkreader.read", "key", r.key) + log.Debug("lazychunkreader.read", "key", r.addr) metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1) read, err = r.ReadAt(b, r.off) if err != nil && err != io.EOF { - log.Error("lazychunkreader.readat", "read", read, "err", err) + log.Debug("lazychunkreader.readat", "read", read, "err", err) metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1) } metrics.GetOrRegisterCounter("lazychunkreader.read.bytes", nil).Inc(int64(read)) r.off += int64(read) - return + return read, err } // completely analogous to standard SectionReader implementation @@ -592,7 +588,7 @@ var errWhence = errors.New("Seek: invalid whence") var errOffset = errors.New("Seek: invalid offset") func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { - log.Debug("lazychunkreader.seek", "key", r.key, "offset", offset) + log.Debug("lazychunkreader.seek", "key", r.addr, "offset", offset) switch whence { default: return 0, errWhence @@ -607,7 +603,7 @@ func (r *LazyChunkReader) Seek(offset int64, whence int) (int64, error) { return 0, fmt.Errorf("can't get size: %v", err) } } - offset += r.chunkData.Size() + offset += int64(r.chunkData.Size()) } if offset < 0 { diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index dbcc8700d..db719ca04 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -21,7 +21,6 @@ import ( "context" "crypto/rand" "encoding/binary" - "errors" "fmt" "io" "testing" @@ -43,27 +42,8 @@ type chunkerTester struct { t test } -// fakeChunkStore doesn't store anything, just implements the ChunkStore interface -// It can be used to inject into a hasherStore if you don't want to actually store data just do the -// hashing -type fakeChunkStore struct { -} - -// Put doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Put(context.Context, *Chunk) { -} - -// Gut doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Get(context.Context, Address) (*Chunk, error) { - return nil, errors.New("FakeChunkStore doesn't support Get") -} - -// Close doesn't store anything it is just here to implement ChunkStore -func (f *fakeChunkStore) Close() { -} - -func newTestHasherStore(chunkStore ChunkStore, hash string) *hasherStore { - return NewHasherStore(chunkStore, MakeHashFunc(hash), false) +func newTestHasherStore(store ChunkStore, hash string) *hasherStore { + return NewHasherStore(store, MakeHashFunc(hash), false) } func testRandomBrokenData(n int, tester *chunkerTester) { @@ -82,11 +62,12 @@ func testRandomBrokenData(n int, tester *chunkerTester) { putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash) expectedError := fmt.Errorf("Broken reader") - addr, _, err := TreeSplit(context.TODO(), brokendata, int64(n), putGetter) + ctx := context.Background() + key, _, err := TreeSplit(ctx, brokendata, int64(n), putGetter) if err == nil || err.Error() != expectedError.Error() { tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err) } - tester.t.Logf(" Key = %v\n", addr) + tester.t.Logf(" Address = %v\n", key) } func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) Address { @@ -96,7 +77,7 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) input, found := tester.inputs[uint64(n)] var data io.Reader if !found { - data, input = generateRandomData(n) + data, input = GenerateRandomData(n) tester.inputs[uint64(n)] = input } else { data = io.LimitReader(bytes.NewReader(input), int64(n)) @@ -116,13 +97,13 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) if err != nil { tester.t.Fatalf(err.Error()) } - tester.t.Logf(" Key = %v\n", addr) + tester.t.Logf(" Address = %v\n", addr) err = wait(ctx) if err != nil { tester.t.Fatalf(err.Error()) } - reader := TreeJoin(context.TODO(), addr, putGetter, 0) + reader := TreeJoin(ctx, addr, putGetter, 0) output := make([]byte, n) r, err := reader.Read(output) if r != n || err != io.EOF { @@ -196,14 +177,14 @@ func TestDataAppend(t *testing.T) { input, found := tester.inputs[uint64(n)] var data io.Reader if !found { - data, input = generateRandomData(n) + data, input = GenerateRandomData(n) tester.inputs[uint64(n)] = input } else { data = io.LimitReader(bytes.NewReader(input), int64(n)) } - chunkStore := NewMapChunkStore() - putGetter := newTestHasherStore(chunkStore, SHA3Hash) + store := NewMapChunkStore() + putGetter := newTestHasherStore(store, SHA3Hash) ctx := context.TODO() addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) @@ -214,18 +195,17 @@ func TestDataAppend(t *testing.T) { if err != nil { tester.t.Fatalf(err.Error()) } - //create a append data stream appendInput, found := tester.inputs[uint64(m)] var appendData io.Reader if !found { - appendData, appendInput = generateRandomData(m) + appendData, appendInput = GenerateRandomData(m) tester.inputs[uint64(m)] = appendInput } else { appendData = io.LimitReader(bytes.NewReader(appendInput), int64(m)) } - putGetter = newTestHasherStore(chunkStore, SHA3Hash) + putGetter = newTestHasherStore(store, SHA3Hash) newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter) if err != nil { tester.t.Fatalf(err.Error()) @@ -256,18 +236,18 @@ func TestRandomData(t *testing.T) { tester := &chunkerTester{t: t} for _, s := range sizes { - treeChunkerKey := testRandomData(false, SHA3Hash, s, tester) - pyramidChunkerKey := testRandomData(true, SHA3Hash, s, tester) - if treeChunkerKey.String() != pyramidChunkerKey.String() { - tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) + treeChunkerAddress := testRandomData(false, SHA3Hash, s, tester) + pyramidChunkerAddress := testRandomData(true, SHA3Hash, s, tester) + if treeChunkerAddress.String() != pyramidChunkerAddress.String() { + tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerAddress.String(), pyramidChunkerAddress.String()) } } for _, s := range sizes { - treeChunkerKey := testRandomData(false, BMTHash, s, tester) - pyramidChunkerKey := testRandomData(true, BMTHash, s, tester) - if treeChunkerKey.String() != pyramidChunkerKey.String() { - tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String()) + treeChunkerAddress := testRandomData(false, BMTHash, s, tester) + pyramidChunkerAddress := testRandomData(true, BMTHash, s, tester) + if treeChunkerAddress.String() != pyramidChunkerAddress.String() { + tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerAddress.String(), pyramidChunkerAddress.String()) } } } @@ -312,12 +292,18 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash) + putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash) - _, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter) + ctx := context.Background() + _, wait, err := TreeSplit(ctx, data, int64(n), putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } + } } @@ -325,36 +311,50 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash) + putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash) - _, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter) + ctx := context.Background() + _, wait, err := TreeSplit(ctx, data, int64(n), putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } } } -func benchmarkSplitPyramidSHA3(n int, t *testing.B) { +func benchmarkSplitPyramidBMT(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash) + putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash) - _, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter) + ctx := context.Background() + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } - } } -func benchmarkSplitPyramidBMT(n int, t *testing.B) { +func benchmarkSplitPyramidSHA3(n int, t *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { data := testDataReader(n) - putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash) + putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash) - _, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter) + ctx := context.Background() + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + if err != nil { + t.Fatalf(err.Error()) + } + err = wait(ctx) if err != nil { t.Fatalf(err.Error()) } @@ -367,10 +367,10 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { data := testDataReader(n) data1 := testDataReader(m) - chunkStore := NewMapChunkStore() - putGetter := newTestHasherStore(chunkStore, SHA3Hash) + store := NewMapChunkStore() + putGetter := newTestHasherStore(store, SHA3Hash) - ctx := context.TODO() + ctx := context.Background() key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) if err != nil { t.Fatalf(err.Error()) @@ -380,7 +380,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { t.Fatalf(err.Error()) } - putGetter = newTestHasherStore(chunkStore, SHA3Hash) + putGetter = newTestHasherStore(store, SHA3Hash) _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter) if err != nil { t.Fatalf(err.Error()) diff --git a/swarm/storage/chunkstore.go b/swarm/storage/chunkstore.go deleted file mode 100644 index 3b4d97a7a..000000000 --- a/swarm/storage/chunkstore.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package storage - -import ( - "context" - "sync" -) - -/* -ChunkStore interface is implemented by : - -- MemStore: a memory cache -- DbStore: local disk/db store -- LocalStore: a combination (sequence of) memStore and dbStore -- NetStore: cloud storage abstraction layer -- FakeChunkStore: dummy store which doesn't store anything just implements the interface -*/ -type ChunkStore interface { - Put(context.Context, *Chunk) // effectively there is no error even if there is an error - Get(context.Context, Address) (*Chunk, error) - Close() -} - -// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory. -type MapChunkStore struct { - chunks map[string]*Chunk - mu sync.RWMutex -} - -func NewMapChunkStore() *MapChunkStore { - return &MapChunkStore{ - chunks: make(map[string]*Chunk), - } -} - -func (m *MapChunkStore) Put(ctx context.Context, chunk *Chunk) { - m.mu.Lock() - defer m.mu.Unlock() - m.chunks[chunk.Addr.Hex()] = chunk - chunk.markAsStored() -} - -func (m *MapChunkStore) Get(ctx context.Context, addr Address) (*Chunk, error) { - m.mu.RLock() - defer m.mu.RUnlock() - chunk := m.chunks[addr.Hex()] - if chunk == nil { - return nil, ErrChunkNotFound - } - return chunk, nil -} - -func (m *MapChunkStore) Close() { -} diff --git a/swarm/storage/common.go b/swarm/storage/common.go deleted file mode 100644 index d6352820e..000000000 --- a/swarm/storage/common.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -package storage - -import ( - "context" - "sync" - - "github.com/ethereum/go-ethereum/swarm/log" -) - -// PutChunks adds chunks to localstore -// It waits for receive on the stored channel -// It logs but does not fail on delivery error -func PutChunks(store *LocalStore, chunks ...*Chunk) { - wg := sync.WaitGroup{} - wg.Add(len(chunks)) - go func() { - for _, c := range chunks { - <-c.dbStoredC - if err := c.GetErrored(); err != nil { - log.Error("chunk store fail", "err", err, "key", c.Addr) - } - wg.Done() - } - }() - for _, c := range chunks { - go store.Put(context.TODO(), c) - } - wg.Wait() -} diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index dc1a3ab35..33133edd7 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -23,16 +23,20 @@ import ( "flag" "fmt" "io" + "io/ioutil" + "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/log" + ch "github.com/ethereum/go-ethereum/swarm/chunk" colorable "github.com/mattn/go-colorable" ) var ( - loglevel = flag.Int("loglevel", 3, "verbosity of logs") + loglevel = flag.Int("loglevel", 3, "verbosity of logs") + getTimeout = 30 * time.Second ) func init() { @@ -56,47 +60,73 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader } } -func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) { - return mput(store, processors, n, GenerateRandomChunk) +func newLDBStore(t *testing.T) (*LDBStore, func()) { + dir, err := ioutil.TempDir("", "bzz-storage-test") + if err != nil { + t.Fatal(err) + } + log.Trace("memstore.tempdir", "dir", dir) + + ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir) + db, err := NewLDBStore(ldbparams) + if err != nil { + t.Fatal(err) + } + + cleanup := func() { + db.Close() + err := os.RemoveAll(dir) + if err != nil { + t.Fatal(err) + } + } + + return db, cleanup } -func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) { - wg := sync.WaitGroup{} - wg.Add(processors) - c := make(chan *Chunk) - for i := 0; i < processors; i++ { +func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) { + return mput(store, n, GenerateRandomChunk) +} + +func mputChunks(store ChunkStore, chunks ...Chunk) error { + i := 0 + f := func(n int64) Chunk { + chunk := chunks[i] + i++ + return chunk + } + _, err := mput(store, len(chunks), f) + return err +} + +func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) { + // put to localstore and wait for stored channel + // does not check delivery error state + errc := make(chan error) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for i := int64(0); i < int64(n); i++ { + chunk := f(ch.DefaultSize) go func() { - defer wg.Done() - for chunk := range c { - wg.Add(1) - chunk := chunk - store.Put(context.TODO(), chunk) - go func() { - defer wg.Done() - <-chunk.dbStoredC - }() + select { + case errc <- store.Put(ctx, chunk): + case <-ctx.Done(): } }() + hs = append(hs, chunk) } - fa := f - if _, ok := store.(*MemStore); ok { - fa = func(i int64) *Chunk { - chunk := f(i) - chunk.markAsStored() - return chunk - } - } + + // wait for all chunks to be stored for i := 0; i < n; i++ { - chunk := fa(int64(i)) - hs = append(hs, chunk.Addr) - c <- chunk + err := <-errc + if err != nil { + return nil, err + } } - close(c) - wg.Wait() - return hs + return hs, nil } -func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error { +func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error) error { wg := sync.WaitGroup{} wg.Add(len(hs)) errc := make(chan error) @@ -104,6 +134,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) for _, k := range hs { go func(h Address) { defer wg.Done() + // TODO: write timeout with context chunk, err := store.Get(context.TODO(), h) if err != nil { errc <- err @@ -143,57 +174,54 @@ func (r *brokenLimitedReader) Read(buf []byte) (int, error) { return r.lr.Read(buf) } -func generateRandomData(l int) (r io.Reader, slice []byte) { - slice = make([]byte, l) - if _, err := rand.Read(slice); err != nil { - panic("rand error") +func testStoreRandom(m ChunkStore, n int, chunksize int64, t *testing.T) { + chunks, err := mputRandomChunks(m, n, chunksize) + if err != nil { + t.Fatalf("expected no error, got %v", err) } - r = io.LimitReader(bytes.NewReader(slice), int64(l)) - return -} - -func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { - hs := mputRandomChunks(m, processors, n, chunksize) - err := mget(m, hs, nil) + err = mget(m, chunkAddresses(chunks), nil) if err != nil { t.Fatalf("testStore failed: %v", err) } } -func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { - hs := mputRandomChunks(m, processors, n, chunksize) - f := func(h Address, chunk *Chunk) error { - if !bytes.Equal(h, chunk.Addr) { - return fmt.Errorf("key does not match retrieved chunk Key") +func testStoreCorrect(m ChunkStore, n int, chunksize int64, t *testing.T) { + chunks, err := mputRandomChunks(m, n, chunksize) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + f := func(h Address, chunk Chunk) error { + if !bytes.Equal(h, chunk.Address()) { + return fmt.Errorf("key does not match retrieved chunk Address") } hasher := MakeHashFunc(DefaultHash)() - hasher.ResetWithLength(chunk.SData[:8]) - hasher.Write(chunk.SData[8:]) + hasher.ResetWithLength(chunk.SpanBytes()) + hasher.Write(chunk.Payload()) exp := hasher.Sum(nil) if !bytes.Equal(h, exp) { return fmt.Errorf("key is not hash of chunk data") } return nil } - err := mget(m, hs, f) + err = mget(m, chunkAddresses(chunks), f) if err != nil { t.Fatalf("testStore failed: %v", err) } } -func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { - chunks := make([]*Chunk, n) +func benchmarkStorePut(store ChunkStore, n int, chunksize int64, b *testing.B) { + chunks := make([]Chunk, n) i := 0 - f := func(dataSize int64) *Chunk { + f := func(dataSize int64) Chunk { chunk := GenerateRandomChunk(dataSize) chunks[i] = chunk i++ return chunk } - mput(store, processors, n, f) + mput(store, n, f) - f = func(dataSize int64) *Chunk { + f = func(dataSize int64) Chunk { chunk := chunks[i] i++ return chunk @@ -204,18 +232,62 @@ func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, for j := 0; j < b.N; j++ { i = 0 - mput(store, processors, n, f) + mput(store, n, f) } } -func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { - hs := mputRandomChunks(store, processors, n, chunksize) +func benchmarkStoreGet(store ChunkStore, n int, chunksize int64, b *testing.B) { + chunks, err := mputRandomChunks(store, n, chunksize) + if err != nil { + b.Fatalf("expected no error, got %v", err) + } b.ReportAllocs() b.ResetTimer() + addrs := chunkAddresses(chunks) for i := 0; i < b.N; i++ { - err := mget(store, hs, nil) + err := mget(store, addrs, nil) if err != nil { b.Fatalf("mget failed: %v", err) } } } + +// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory. +type MapChunkStore struct { + chunks map[string]Chunk + mu sync.RWMutex +} + +func NewMapChunkStore() *MapChunkStore { + return &MapChunkStore{ + chunks: make(map[string]Chunk), + } +} + +func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error { + m.mu.Lock() + defer m.mu.Unlock() + m.chunks[ch.Address().Hex()] = ch + return nil +} + +func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) { + m.mu.RLock() + defer m.mu.RUnlock() + chunk := m.chunks[ref.Hex()] + if chunk == nil { + return nil, ErrChunkNotFound + } + return chunk, nil +} + +func (m *MapChunkStore) Close() { +} + +func chunkAddresses(chunks []Chunk) []Address { + addrs := make([]Address, len(chunks)) + for i, ch := range chunks { + addrs[i] = ch.Address() + } + return addrs +} diff --git a/swarm/storage/dbapi.go b/swarm/storage/dbapi.go deleted file mode 100644 index dd71752eb..000000000 --- a/swarm/storage/dbapi.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package storage - -import "context" - -// wrapper of db-s to provide mockable custom local chunk store access to syncer -type DBAPI struct { - db *LDBStore - loc *LocalStore -} - -func NewDBAPI(loc *LocalStore) *DBAPI { - return &DBAPI{loc.DbStore, loc} -} - -// to obtain the chunks from address or request db entry only -func (d *DBAPI) Get(ctx context.Context, addr Address) (*Chunk, error) { - return d.loc.Get(ctx, addr) -} - -// current storage counter of chunk db -func (d *DBAPI) CurrentBucketStorageIndex(po uint8) uint64 { - return d.db.CurrentBucketStorageIndex(po) -} - -// iteration storage counter and proximity order -func (d *DBAPI) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { - return d.db.SyncIterator(from, to, po, f) -} - -// to obtain the chunks from address or request db entry only -func (d *DBAPI) GetOrCreateRequest(ctx context.Context, addr Address) (*Chunk, bool) { - return d.loc.GetOrCreateRequest(ctx, addr) -} - -// to obtain the chunks from key or request db entry only -func (d *DBAPI) Put(ctx context.Context, chunk *Chunk) { - d.loc.Put(ctx, chunk) -} diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go index f3f597255..d79efb530 100644 --- a/swarm/storage/filestore_test.go +++ b/swarm/storage/filestore_test.go @@ -49,11 +49,11 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { fileStore := NewFileStore(localStore, NewFileStoreParams()) defer os.RemoveAll("/tmp/bzz") - reader, slice := generateRandomData(testDataSize) + reader, slice := GenerateRandomData(testDataSize) ctx := context.TODO() key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt) if err != nil { - t.Errorf("Store error: %v", err) + t.Fatalf("Store error: %v", err) } err = wait(ctx) if err != nil { @@ -66,13 +66,13 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { resultSlice := make([]byte, len(slice)) n, err := resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error: %v", err) + t.Fatalf("Retrieve error: %v", err) } if n != len(slice) { - t.Errorf("Slice size error got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error.") + t.Fatalf("Comparison error.") } ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666) ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666) @@ -86,13 +86,13 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { } n, err = resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error after removing memStore: %v", err) + t.Fatalf("Retrieve error after removing memStore: %v", err) } if n != len(slice) { - t.Errorf("Slice size error after removing memStore got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error after removing memStore got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error after removing memStore.") + t.Fatalf("Comparison error after removing memStore.") } } @@ -114,7 +114,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { DbStore: db, } fileStore := NewFileStore(localStore, NewFileStoreParams()) - reader, slice := generateRandomData(testDataSize) + reader, slice := GenerateRandomData(testDataSize) ctx := context.TODO() key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt) if err != nil { @@ -122,7 +122,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { } err = wait(ctx) if err != nil { - t.Errorf("Store error: %v", err) + t.Fatalf("Store error: %v", err) } resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key) if isEncrypted != toEncrypt { @@ -131,13 +131,13 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { resultSlice := make([]byte, len(slice)) n, err := resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error: %v", err) + t.Fatalf("Retrieve error: %v", err) } if n != len(slice) { - t.Errorf("Slice size error got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error.") + t.Fatalf("Comparison error.") } // Clear memStore memStore.setCapacity(0) @@ -148,7 +148,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted) } if _, err = resultReader.ReadAt(resultSlice, 0); err == nil { - t.Errorf("Was able to read %d bytes from an empty memStore.", len(slice)) + t.Fatalf("Was able to read %d bytes from an empty memStore.", len(slice)) } // check how it works with localStore fileStore.ChunkStore = localStore @@ -162,12 +162,12 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { } n, err = resultReader.ReadAt(resultSlice, 0) if err != io.EOF { - t.Errorf("Retrieve error after clearing memStore: %v", err) + t.Fatalf("Retrieve error after clearing memStore: %v", err) } if n != len(slice) { - t.Errorf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice)) + t.Fatalf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice)) } if !bytes.Equal(slice, resultSlice) { - t.Errorf("Comparison error after clearing memStore.") + t.Fatalf("Comparison error after clearing memStore.") } } diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index 766207eae..879622b9a 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -19,10 +19,10 @@ package storage import ( "context" "fmt" - "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/crypto/sha3" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage/encryption" ) @@ -30,31 +30,36 @@ type hasherStore struct { store ChunkStore toEncrypt bool hashFunc SwarmHasher - hashSize int // content hash size - refSize int64 // reference size (content hash + possibly encryption key) - wg *sync.WaitGroup - closed chan struct{} + hashSize int // content hash size + refSize int64 // reference size (content hash + possibly encryption key) + nrChunks uint64 // number of chunks to store + errC chan error // global error channel + doneC chan struct{} // closed by Close() call to indicate that count is the final number of chunks + quitC chan struct{} // closed to quit unterminated routines } // NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. // With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore // and the hasherStore will take core of encryption/decryption of data if necessary -func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { +func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { hashSize := hashFunc().Size() refSize := int64(hashSize) if toEncrypt { refSize += encryption.KeyLength } - return &hasherStore{ - store: chunkStore, + h := &hasherStore{ + store: store, toEncrypt: toEncrypt, hashFunc: hashFunc, hashSize: hashSize, refSize: refSize, - wg: &sync.WaitGroup{}, - closed: make(chan struct{}), + errC: make(chan error), + doneC: make(chan struct{}), + quitC: make(chan struct{}), } + + return h } // Put stores the chunkData into the ChunkStore of the hasherStore and returns the reference. @@ -62,7 +67,6 @@ func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) // Asynchronous function, the data will not necessarily be stored when it returns. func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, error) { c := chunkData - size := chunkData.Size() var encryptionKey encryption.Key if h.toEncrypt { var err error @@ -71,29 +75,28 @@ func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, return nil, err } } - chunk := h.createChunk(c, size) - + chunk := h.createChunk(c) h.storeChunk(ctx, chunk) - return Reference(append(chunk.Addr, encryptionKey...)), nil + return Reference(append(chunk.Address(), encryptionKey...)), nil } // Get returns data of the chunk with the given reference (retrieved from the ChunkStore of hasherStore). // If the data is encrypted and the reference contains an encryption key, it will be decrypted before // return. func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) { - key, encryptionKey, err := parseReference(ref, h.hashSize) + addr, encryptionKey, err := parseReference(ref, h.hashSize) if err != nil { return nil, err } - toDecrypt := (encryptionKey != nil) - chunk, err := h.store.Get(ctx, key) + chunk, err := h.store.Get(ctx, addr) if err != nil { return nil, err } - chunkData := chunk.SData + chunkData := ChunkData(chunk.Data()) + toDecrypt := (encryptionKey != nil) if toDecrypt { var err error chunkData, err = h.decryptChunkData(chunkData, encryptionKey) @@ -107,16 +110,40 @@ func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) // Close indicates that no more chunks will be put with the hasherStore, so the Wait // function can return when all the previously put chunks has been stored. func (h *hasherStore) Close() { - close(h.closed) + close(h.doneC) } // Wait returns when // 1) the Close() function has been called and // 2) all the chunks which has been Put has been stored func (h *hasherStore) Wait(ctx context.Context) error { - <-h.closed - h.wg.Wait() - return nil + defer close(h.quitC) + var nrStoredChunks uint64 // number of stored chunks + var done bool + doneC := h.doneC + for { + select { + // if context is done earlier, just return with the error + case <-ctx.Done(): + return ctx.Err() + // doneC is closed if all chunks have been submitted, from then we just wait until all of them are also stored + case <-doneC: + done = true + doneC = nil + // a chunk has been stored, if err is nil, then successfully, so increase the stored chunk counter + case err := <-h.errC: + if err != nil { + return err + } + nrStoredChunks++ + } + // if all the chunks have been submitted and all of them are stored, then we can return + if done { + if nrStoredChunks >= atomic.LoadUint64(&h.nrChunks) { + return nil + } + } + } } func (h *hasherStore) createHash(chunkData ChunkData) Address { @@ -126,12 +153,9 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address { return hasher.Sum(nil) } -func (h *hasherStore) createChunk(chunkData ChunkData, chunkSize int64) *Chunk { +func (h *hasherStore) createChunk(chunkData ChunkData) *chunk { hash := h.createHash(chunkData) - chunk := NewChunk(hash, nil) - chunk.SData = chunkData - chunk.Size = chunkSize - + chunk := NewChunk(hash, chunkData) return chunk } @@ -162,10 +186,10 @@ func (h *hasherStore) decryptChunkData(chunkData ChunkData, encryptionKey encryp // removing extra bytes which were just added for padding length := ChunkData(decryptedSpan).Size() - for length > chunk.DefaultSize { - length = length + (chunk.DefaultSize - 1) - length = length / chunk.DefaultSize - length *= h.refSize + for length > ch.DefaultSize { + length = length + (ch.DefaultSize - 1) + length = length / ch.DefaultSize + length *= uint64(h.refSize) } c := make(ChunkData, length+8) @@ -205,32 +229,32 @@ func (h *hasherStore) decrypt(chunkData ChunkData, key encryption.Key) ([]byte, } func (h *hasherStore) newSpanEncryption(key encryption.Key) encryption.Encryption { - return encryption.New(key, 0, uint32(chunk.DefaultSize/h.refSize), sha3.NewKeccak256) + return encryption.New(key, 0, uint32(ch.DefaultSize/h.refSize), sha3.NewKeccak256) } func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryption { - return encryption.New(key, int(chunk.DefaultSize), 0, sha3.NewKeccak256) + return encryption.New(key, int(ch.DefaultSize), 0, sha3.NewKeccak256) } -func (h *hasherStore) storeChunk(ctx context.Context, chunk *Chunk) { - h.wg.Add(1) +func (h *hasherStore) storeChunk(ctx context.Context, chunk *chunk) { + atomic.AddUint64(&h.nrChunks, 1) go func() { - <-chunk.dbStoredC - h.wg.Done() + select { + case h.errC <- h.store.Put(ctx, chunk): + case <-h.quitC: + } }() - h.store.Put(ctx, chunk) } func parseReference(ref Reference, hashSize int) (Address, encryption.Key, error) { - encryptedKeyLength := hashSize + encryption.KeyLength + encryptedRefLength := hashSize + encryption.KeyLength switch len(ref) { - case KeyLength: + case AddressLength: return Address(ref), nil, nil - case encryptedKeyLength: + case encryptedRefLength: encKeyIdx := len(ref) - encryption.KeyLength return Address(ref[:encKeyIdx]), encryption.Key(ref[encKeyIdx:]), nil default: - return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedKeyLength, len(ref)) + return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedRefLength, len(ref)) } - } diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go index ddf1c39b0..22cf98d0e 100644 --- a/swarm/storage/hasherstore_test.go +++ b/swarm/storage/hasherstore_test.go @@ -46,14 +46,16 @@ func TestHasherStore(t *testing.T) { hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt) // Put two random chunks into the hasherStore - chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key1, err := hasherStore.Put(context.TODO(), chunkData1) + chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data() + ctx, cancel := context.WithTimeout(context.Background(), getTimeout) + defer cancel() + key1, err := hasherStore.Put(ctx, chunkData1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } - chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).SData - key2, err := hasherStore.Put(context.TODO(), chunkData2) + chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).Data() + key2, err := hasherStore.Put(ctx, chunkData2) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } @@ -61,13 +63,13 @@ func TestHasherStore(t *testing.T) { hasherStore.Close() // Wait until chunks are really stored - err = hasherStore.Wait(context.TODO()) + err = hasherStore.Wait(ctx) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } // Get the first chunk - retrievedChunkData1, err := hasherStore.Get(context.TODO(), key1) + retrievedChunkData1, err := hasherStore.Get(ctx, key1) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -78,7 +80,7 @@ func TestHasherStore(t *testing.T) { } // Get the second chunk - retrievedChunkData2, err := hasherStore.Get(context.TODO(), key2) + retrievedChunkData2, err := hasherStore.Get(ctx, key2) if err != nil { t.Fatalf("Expected no error, got \"%v\"", err) } @@ -105,12 +107,12 @@ func TestHasherStore(t *testing.T) { } // Check if chunk data in store is encrypted or not - chunkInStore, err := chunkStore.Get(context.TODO(), hash1) + chunkInStore, err := chunkStore.Get(ctx, hash1) if err != nil { t.Fatalf("Expected no error got \"%v\"", err) } - chunkDataInStore := chunkInStore.SData + chunkDataInStore := chunkInStore.Data() if tt.toEncrypt && bytes.Equal(chunkData1, chunkDataInStore) { t.Fatalf("Chunk expected to be encrypted but it is stored without encryption") diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 675b5de01..8ab7e60b3 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -28,6 +28,7 @@ import ( "context" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -36,7 +37,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/syndtr/goleveldb/leveldb" @@ -62,6 +63,10 @@ var ( keyDistanceCnt = byte(7) ) +var ( + ErrDBClosed = errors.New("LDBStore closed") +) + type gcItem struct { idx uint64 value uint64 @@ -99,18 +104,29 @@ type LDBStore struct { batchC chan bool batchesC chan struct{} - batch *leveldb.Batch + closed bool + batch *dbBatch lock sync.RWMutex quit chan struct{} // Functions encodeDataFunc is used to bypass // the default functionality of DbStore with // mock.NodeStore for testing purposes. - encodeDataFunc func(chunk *Chunk) []byte + encodeDataFunc func(chunk Chunk) []byte // If getDataFunc is defined, it will be used for // retrieving the chunk data instead from the local // LevelDB database. - getDataFunc func(addr Address) (data []byte, err error) + getDataFunc func(key Address) (data []byte, err error) +} + +type dbBatch struct { + *leveldb.Batch + err error + c chan struct{} +} + +func newBatch() *dbBatch { + return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})} } // TODO: Instead of passing the distance function, just pass the address from which distances are calculated @@ -121,10 +137,9 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { s.hashfunc = params.Hash s.quit = make(chan struct{}) - s.batchC = make(chan bool) s.batchesC = make(chan struct{}, 1) go s.writeBatches() - s.batch = new(leveldb.Batch) + s.batch = newBatch() // associate encodeData with default functionality s.encodeDataFunc = encodeData @@ -143,7 +158,6 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { k[1] = uint8(i) cnt, _ := s.db.Get(k) s.bucketCnt[i] = BytesToU64(cnt) - s.bucketCnt[i]++ } data, _ := s.db.Get(keyEntryCnt) s.entryCnt = BytesToU64(data) @@ -202,14 +216,6 @@ func getIndexKey(hash Address) []byte { return key } -func getOldDataKey(idx uint64) []byte { - key := make([]byte, 9) - key[0] = keyOldData - binary.BigEndian.PutUint64(key[1:9], idx) - - return key -} - func getDataKey(idx uint64, po uint8) []byte { key := make([]byte, 10) key[0] = keyData @@ -224,12 +230,12 @@ func encodeIndex(index *dpaDBIndex) []byte { return data } -func encodeData(chunk *Chunk) []byte { +func encodeData(chunk Chunk) []byte { // Always create a new underlying array for the returned byte slice. - // The chunk.Key array may be used in the returned slice which + // The chunk.Address array may be used in the returned slice which // may be changed later in the code or by the LevelDB, resulting - // that the Key is changed as well. - return append(append([]byte{}, chunk.Addr[:]...), chunk.SData...) + // that the Address is changed as well. + return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...) } func decodeIndex(data []byte, index *dpaDBIndex) error { @@ -237,14 +243,8 @@ func decodeIndex(data []byte, index *dpaDBIndex) error { return dec.Decode(index) } -func decodeData(data []byte, chunk *Chunk) { - chunk.SData = data[32:] - chunk.Size = int64(binary.BigEndian.Uint64(data[32:40])) -} - -func decodeOldData(data []byte, chunk *Chunk) { - chunk.SData = data - chunk.Size = int64(binary.BigEndian.Uint64(data[0:8])) +func decodeData(addr Address, data []byte) (*chunk, error) { + return NewChunk(addr, data[32:]), nil } func (s *LDBStore) collectGarbage(ratio float32) { @@ -347,44 +347,75 @@ func (s *LDBStore) Export(out io.Writer) (int64, error) { func (s *LDBStore) Import(in io.Reader) (int64, error) { tr := tar.NewReader(in) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + countC := make(chan int64) + errC := make(chan error) var count int64 - var wg sync.WaitGroup - for { - hdr, err := tr.Next() - if err == io.EOF { - break - } else if err != nil { - return count, err - } + go func() { + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } else if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } - if len(hdr.Name) != 64 { - log.Warn("ignoring non-chunk file", "name", hdr.Name) - continue - } + if len(hdr.Name) != 64 { + log.Warn("ignoring non-chunk file", "name", hdr.Name) + continue + } - keybytes, err := hex.DecodeString(hdr.Name) - if err != nil { - log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) - continue + keybytes, err := hex.DecodeString(hdr.Name) + if err != nil { + log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) + continue + } + + data, err := ioutil.ReadAll(tr) + if err != nil { + select { + case errC <- err: + case <-ctx.Done(): + } + } + key := Address(keybytes) + chunk := NewChunk(key, data[32:]) + + go func() { + select { + case errC <- s.Put(ctx, chunk): + case <-ctx.Done(): + } + }() + + count++ } + countC <- count + }() - data, err := ioutil.ReadAll(tr) - if err != nil { - return count, err + // wait for all chunks to be stored + i := int64(0) + var total int64 + for { + select { + case err := <-errC: + if err != nil { + return count, err + } + i++ + case total = <-countC: + case <-ctx.Done(): + return i, ctx.Err() + } + if total > 0 && i == total { + return total, nil } - key := Address(keybytes) - chunk := NewChunk(key, nil) - chunk.SData = data[32:] - s.Put(context.TODO(), chunk) - wg.Add(1) - go func() { - defer wg.Done() - <-chunk.dbStoredC - }() - count++ } - wg.Wait() - return count, nil } func (s *LDBStore) Cleanup() { @@ -430,15 +461,18 @@ func (s *LDBStore) Cleanup() { } } - c := &Chunk{} ck := data[:32] - decodeData(data, c) + c, err := decodeData(ck, data) + if err != nil { + log.Error("decodeData error", "err", err) + continue + } - cs := int64(binary.LittleEndian.Uint64(c.SData[:8])) - log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + cs := int64(binary.LittleEndian.Uint64(c.sdata[:8])) + log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) - if len(c.SData) > chunk.DefaultSize+8 { - log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + if len(c.sdata) > ch.DefaultSize+8 { + log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) s.delete(index.Idx, getIndexKey(key[1:]), po) removed++ errorsFound++ @@ -511,7 +545,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { batch.Delete(getDataKey(idx, po)) s.entryCnt-- dbEntryCount.Dec(1) - s.bucketCnt[po]-- cntKey := make([]byte, 2) cntKey[0] = keyDistanceCnt cntKey[1] = po @@ -520,10 +553,9 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { s.db.Write(batch) } -func (s *LDBStore) CurrentBucketStorageIndex(po uint8) uint64 { +func (s *LDBStore) BinIndex(po uint8) uint64 { s.lock.RLock() defer s.lock.RUnlock() - return s.bucketCnt[po] } @@ -539,43 +571,53 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } -func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) { +func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) - log.Trace("ldbstore.put", "key", chunk.Addr) + log.Trace("ldbstore.put", "key", chunk.Address()) - ikey := getIndexKey(chunk.Addr) + ikey := getIndexKey(chunk.Address()) var index dpaDBIndex - po := s.po(chunk.Addr) + po := s.po(chunk.Address()) + s.lock.Lock() - defer s.lock.Unlock() - log.Trace("ldbstore.put: s.db.Get", "key", chunk.Addr, "ikey", fmt.Sprintf("%x", ikey)) + if s.closed { + s.lock.Unlock() + return ErrDBClosed + } + batch := s.batch + + log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey)) idata, err := s.db.Get(ikey) if err != nil { s.doPut(chunk, &index, po) - batchC := s.batchC - go func() { - <-batchC - chunk.markAsStored() - }() } else { - log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Addr) + log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address) decodeIndex(idata, &index) - chunk.markAsStored() } index.Access = s.accessCnt s.accessCnt++ idata = encodeIndex(&index) s.batch.Put(ikey, idata) + + s.lock.Unlock() + select { case s.batchesC <- struct{}{}: default: } + + select { + case <-batch.c: + return batch.err + case <-ctx.Done(): + return ctx.Err() + } } // force putting into db, does not check access index -func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) { +func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { data := s.encodeDataFunc(chunk) dkey := getDataKey(s.dataIdx, po) s.batch.Put(dkey, data) @@ -592,58 +634,64 @@ func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) { } func (s *LDBStore) writeBatches() { -mainLoop: for { select { case <-s.quit: - break mainLoop + log.Debug("DbStore: quit batch write loop") + return case <-s.batchesC: - s.lock.Lock() - b := s.batch - e := s.entryCnt - d := s.dataIdx - a := s.accessCnt - c := s.batchC - s.batchC = make(chan bool) - s.batch = new(leveldb.Batch) - err := s.writeBatch(b, e, d, a) - // TODO: set this error on the batch, then tell the chunk + err := s.writeCurrentBatch() if err != nil { - log.Error(fmt.Sprintf("spawn batch write (%d entries): %v", b.Len(), err)) + log.Debug("DbStore: quit batch write loop", "err", err.Error()) + return } - close(c) - for e > s.capacity { - log.Trace("for >", "e", e, "s.capacity", s.capacity) - // Collect garbage in a separate goroutine - // to be able to interrupt this loop by s.quit. - done := make(chan struct{}) - go func() { - s.collectGarbage(gcArrayFreeRatio) - log.Trace("collectGarbage closing done") - close(done) - }() + } + } - select { - case <-s.quit: - s.lock.Unlock() - break mainLoop - case <-done: - } - e = s.entryCnt - } - s.lock.Unlock() +} + +func (s *LDBStore) writeCurrentBatch() error { + s.lock.Lock() + defer s.lock.Unlock() + b := s.batch + l := b.Len() + if l == 0 { + return nil + } + e := s.entryCnt + d := s.dataIdx + a := s.accessCnt + s.batch = newBatch() + b.err = s.writeBatch(b, e, d, a) + close(b.c) + for e > s.capacity { + log.Trace("for >", "e", e, "s.capacity", s.capacity) + // Collect garbage in a separate goroutine + // to be able to interrupt this loop by s.quit. + done := make(chan struct{}) + go func() { + s.collectGarbage(gcArrayFreeRatio) + log.Trace("collectGarbage closing done") + close(done) + }() + + select { + case <-s.quit: + return errors.New("CollectGarbage terminated due to quit") + case <-done: } + e = s.entryCnt } - log.Trace(fmt.Sprintf("DbStore: quit batch write loop")) + return nil } // must be called non concurrently -func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uint64) error { +func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error { b.Put(keyEntryCnt, U64ToBytes(entryCnt)) b.Put(keyDataIdx, U64ToBytes(dataIdx)) b.Put(keyAccessCnt, U64ToBytes(accessCnt)) l := b.Len() - if err := s.db.Write(b); err != nil { + if err := s.db.Write(b.Batch); err != nil { return fmt.Errorf("unable to write batch: %v", err) } log.Trace(fmt.Sprintf("batch write (%d entries)", l)) @@ -654,12 +702,12 @@ func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uin // to a mock store to bypass the default functionality encodeData. // The constructed function always returns the nil data, as DbStore does // not need to store the data, but still need to create the index. -func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk *Chunk) []byte { - return func(chunk *Chunk) []byte { - if err := mockStore.Put(chunk.Addr, encodeData(chunk)); err != nil { - log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Addr.Log(), err)) +func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { + return func(chunk Chunk) []byte { + if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil { + log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err)) } - return chunk.Addr[:] + return chunk.Address()[:] } } @@ -682,7 +730,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { return true } -func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) @@ -691,9 +739,11 @@ func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err err return s.get(addr) } -func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) { +func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { var indx dpaDBIndex - + if s.closed { + return nil, ErrDBClosed + } if s.tryAccessIdx(getIndexKey(addr), &indx) { var data []byte if s.getDataFunc != nil { @@ -716,9 +766,7 @@ func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) { } } - chunk = NewChunk(addr, nil) - chunk.markAsStored() - decodeData(data, chunk) + return decodeData(addr, data) } else { err = ErrChunkNotFound } @@ -772,6 +820,12 @@ func (s *LDBStore) setCapacity(c uint64) { func (s *LDBStore) Close() { close(s.quit) + s.lock.Lock() + s.closed = true + s.lock.Unlock() + // force writing out current batch + s.writeCurrentBatch() + close(s.batchesC) s.db.Close() } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 9694a724e..ae70ee259 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -22,13 +22,12 @@ import ( "fmt" "io/ioutil" "os" - "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" ldberrors "github.com/syndtr/goleveldb/leveldb/errors" @@ -86,70 +85,54 @@ func (db *testDbStore) close() { } } -func testDbStoreRandom(n int, processors int, chunksize int64, mock bool, t *testing.T) { +func testDbStoreRandom(n int, chunksize int64, mock bool, t *testing.T) { db, cleanup, err := newTestDbStore(mock, true) defer cleanup() if err != nil { t.Fatalf("init dbStore failed: %v", err) } - testStoreRandom(db, processors, n, chunksize, t) + testStoreRandom(db, n, chunksize, t) } -func testDbStoreCorrect(n int, processors int, chunksize int64, mock bool, t *testing.T) { +func testDbStoreCorrect(n int, chunksize int64, mock bool, t *testing.T) { db, cleanup, err := newTestDbStore(mock, false) defer cleanup() if err != nil { t.Fatalf("init dbStore failed: %v", err) } - testStoreCorrect(db, processors, n, chunksize, t) + testStoreCorrect(db, n, chunksize, t) } func TestDbStoreRandom_1(t *testing.T) { - testDbStoreRandom(1, 1, 0, false, t) + testDbStoreRandom(1, 0, false, t) } func TestDbStoreCorrect_1(t *testing.T) { - testDbStoreCorrect(1, 1, 4096, false, t) + testDbStoreCorrect(1, 4096, false, t) } -func TestDbStoreRandom_1_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, false, t) +func TestDbStoreRandom_5k(t *testing.T) { + testDbStoreRandom(5000, 0, false, t) } -func TestDbStoreRandom_8_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, false, t) -} - -func TestDbStoreCorrect_1_5k(t *testing.T) { - testDbStoreCorrect(1, 5000, 4096, false, t) -} - -func TestDbStoreCorrect_8_5k(t *testing.T) { - testDbStoreCorrect(8, 5000, 4096, false, t) +func TestDbStoreCorrect_5k(t *testing.T) { + testDbStoreCorrect(5000, 4096, false, t) } func TestMockDbStoreRandom_1(t *testing.T) { - testDbStoreRandom(1, 1, 0, true, t) + testDbStoreRandom(1, 0, true, t) } func TestMockDbStoreCorrect_1(t *testing.T) { - testDbStoreCorrect(1, 1, 4096, true, t) + testDbStoreCorrect(1, 4096, true, t) } -func TestMockDbStoreRandom_1_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, true, t) +func TestMockDbStoreRandom_5k(t *testing.T) { + testDbStoreRandom(5000, 0, true, t) } -func TestMockDbStoreRandom_8_5k(t *testing.T) { - testDbStoreRandom(8, 5000, 0, true, t) -} - -func TestMockDbStoreCorrect_1_5k(t *testing.T) { - testDbStoreCorrect(1, 5000, 4096, true, t) -} - -func TestMockDbStoreCorrect_8_5k(t *testing.T) { - testDbStoreCorrect(8, 5000, 4096, true, t) +func TestMockDbStoreCorrect_5k(t *testing.T) { + testDbStoreCorrect(5000, 4096, true, t) } func testDbStoreNotFound(t *testing.T, mock bool) { @@ -185,26 +168,19 @@ func testIterator(t *testing.T, mock bool) { t.Fatalf("init dbStore failed: %v", err) } - chunks := GenerateRandomChunks(chunk.DefaultSize, chunkcount) + chunks := GenerateRandomChunks(ch.DefaultSize, chunkcount) - wg := &sync.WaitGroup{} - wg.Add(len(chunks)) for i = 0; i < len(chunks); i++ { - db.Put(context.TODO(), chunks[i]) - chunkkeys[i] = chunks[i].Addr - j := i - go func() { - defer wg.Done() - <-chunks[j].dbStoredC - }() + chunkkeys[i] = chunks[i].Address() + err := db.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatalf("dbStore.Put failed: %v", err) + } } - //testSplit(m, l, 128, chunkkeys, t) - for i = 0; i < len(chunkkeys); i++ { log.Trace(fmt.Sprintf("Chunk array pos %d/%d: '%v'", i, chunkcount, chunkkeys[i])) } - wg.Wait() i = 0 for poc = 0; poc <= 255; poc++ { err := db.SyncIterator(0, uint64(chunkkeys.Len()), uint8(poc), func(k Address, n uint64) bool { @@ -239,7 +215,7 @@ func benchmarkDbStorePut(n int, processors int, chunksize int64, mock bool, b *t if err != nil { b.Fatalf("init dbStore failed: %v", err) } - benchmarkStorePut(db, processors, n, chunksize, b) + benchmarkStorePut(db, n, chunksize, b) } func benchmarkDbStoreGet(n int, processors int, chunksize int64, mock bool, b *testing.B) { @@ -248,7 +224,7 @@ func benchmarkDbStoreGet(n int, processors int, chunksize int64, mock bool, b *t if err != nil { b.Fatalf("init dbStore failed: %v", err) } - benchmarkStoreGet(db, processors, n, chunksize, b) + benchmarkStoreGet(db, n, chunksize, b) } func BenchmarkDbStorePut_1_500(b *testing.B) { @@ -293,35 +269,22 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { ldb.setCapacity(uint64(capacity)) defer cleanup() - chunks := []*Chunk{} - for i := 0; i < n; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) - chunks = append(chunks, c) - log.Trace("generate random chunk", "idx", i, "chunk", c) - } - - for i := 0; i < n; i++ { - go ldb.Put(context.TODO(), chunks[i]) - } - - // wait for all chunks to be stored - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) - for i := 0; i < n; i++ { - ret, err := ldb.Get(context.TODO(), chunks[i].Addr) + for _, ch := range chunks { + ret, err := ldb.Get(context.TODO(), ch.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(ret.SData, chunks[i].SData) { + if !bytes.Equal(ret.Data(), ch.Data()) { t.Fatal("expected to get the same data back, but got smth else") } - - log.Info("got back chunk", "chunk", ret) } if ldb.entryCnt != uint64(n) { @@ -343,30 +306,18 @@ func TestLDBStoreCollectGarbage(t *testing.T) { ldb.setCapacity(uint64(capacity)) defer cleanup() - chunks := []*Chunk{} - for i := 0; i < n; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) - chunks = append(chunks, c) - log.Trace("generate random chunk", "idx", i, "chunk", c) - } - - for i := 0; i < n; i++ { - ldb.Put(context.TODO(), chunks[i]) - } - - // wait for all chunks to be stored - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) } - log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) // wait for garbage collection to kick in on the responsible actor time.Sleep(1 * time.Second) var missing int - for i := 0; i < n; i++ { - ret, err := ldb.Get(context.TODO(), chunks[i].Addr) + for _, ch := range chunks { + ret, err := ldb.Get(context.Background(), ch.Address()) if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { missing++ continue @@ -375,7 +326,7 @@ func TestLDBStoreCollectGarbage(t *testing.T) { t.Fatal(err) } - if !bytes.Equal(ret.SData, chunks[i].SData) { + if !bytes.Equal(ret.Data(), ch.Data()) { t.Fatal("expected to get the same data back, but got smth else") } @@ -396,38 +347,27 @@ func TestLDBStoreAddRemove(t *testing.T) { defer cleanup() n := 100 - - chunks := []*Chunk{} - for i := 0; i < n; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) - chunks = append(chunks, c) - log.Trace("generate random chunk", "idx", i, "chunk", c) - } - - for i := 0; i < n; i++ { - go ldb.Put(context.TODO(), chunks[i]) - } - - // wait for all chunks to be stored before continuing - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatalf(err.Error()) } for i := 0; i < n; i++ { // delete all even index chunks if i%2 == 0 { - ldb.Delete(chunks[i].Addr) + ldb.Delete(chunks[i].Address()) } } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) for i := 0; i < n; i++ { - ret, err := ldb.Get(context.TODO(), chunks[i].Addr) + ret, err := ldb.Get(nil, chunks[i].Address()) if i%2 == 0 { // expect even chunks to be missing - if err == nil || ret != nil { + if err == nil { + // if err != ErrChunkNotFound { t.Fatal("expected chunk to be missing, but got no error") } } else { @@ -436,7 +376,7 @@ func TestLDBStoreAddRemove(t *testing.T) { t.Fatalf("expected no error, but got %s", err) } - if !bytes.Equal(ret.SData, chunks[i].SData) { + if !bytes.Equal(ret.Data(), chunks[i].Data()) { t.Fatal("expected to get the same data back, but got smth else") } } @@ -446,15 +386,16 @@ func TestLDBStoreAddRemove(t *testing.T) { // TestLDBStoreRemoveThenCollectGarbage tests that we can delete chunks and that we can trigger garbage collection func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { capacity := 11 + surplus := 4 ldb, cleanup := newLDBStore(t) ldb.setCapacity(uint64(capacity)) - n := 11 + n := capacity - chunks := []*Chunk{} - for i := 0; i < capacity; i++ { - c := GenerateRandomChunk(chunk.DefaultSize) + chunks := []Chunk{} + for i := 0; i < n+surplus; i++ { + c := GenerateRandomChunk(ch.DefaultSize) chunks = append(chunks, c) log.Trace("generate random chunk", "idx", i, "chunk", c) } @@ -463,53 +404,54 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { ldb.Put(context.TODO(), chunks[i]) } - // wait for all chunks to be stored before continuing - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC - } - // delete all chunks for i := 0; i < n; i++ { - ldb.Delete(chunks[i].Addr) + ldb.Delete(chunks[i].Address()) } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + if ldb.entryCnt != 0 { + t.Fatalf("ldb.entrCnt expected 0 got %v", ldb.entryCnt) + } + + expAccessCnt := uint64(n * 2) + if ldb.accessCnt != expAccessCnt { + t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.entryCnt) + } + cleanup() ldb, cleanup = newLDBStore(t) capacity = 10 ldb.setCapacity(uint64(capacity)) + defer cleanup() - n = 11 + n = capacity + surplus for i := 0; i < n; i++ { ldb.Put(context.TODO(), chunks[i]) } - // wait for all chunks to be stored before continuing - for i := 0; i < n; i++ { - <-chunks[i].dbStoredC - } - // wait for garbage collection time.Sleep(1 * time.Second) - // expect for first chunk to be missing, because it has the smallest access value - idx := 0 - ret, err := ldb.Get(context.TODO(), chunks[idx].Addr) - if err == nil || ret != nil { - t.Fatal("expected first chunk to be missing, but got no error") - } - - // expect for last chunk to be present, as it has the largest access value - idx = 9 - ret, err = ldb.Get(context.TODO(), chunks[idx].Addr) - if err != nil { - t.Fatalf("expected no error, but got %s", err) + // expect first surplus chunks to be missing, because they have the smallest access value + for i := 0; i < surplus; i++ { + _, err := ldb.Get(context.TODO(), chunks[i].Address()) + if err == nil { + t.Fatal("expected surplus chunk to be missing, but got no error") + } } - if !bytes.Equal(ret.SData, chunks[idx].SData) { - t.Fatal("expected to get the same data back, but got smth else") + // expect last chunks to be present, as they have the largest access value + for i := surplus; i < surplus+capacity; i++ { + ret, err := ldb.Get(context.TODO(), chunks[i].Address()) + if err != nil { + t.Fatalf("chunk %v: expected no error, but got %s", i, err) + } + if !bytes.Equal(ret.Data(), chunks[i].Data()) { + t.Fatal("expected to get the same data back, but got smth else") + } } } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 9e3474979..04701ee69 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -18,8 +18,6 @@ package storage import ( "context" - "encoding/binary" - "fmt" "path/filepath" "sync" @@ -97,123 +95,89 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { // when the chunk is stored in memstore. // After the LDBStore.Put, it is ensured that the MemStore // contains the chunk with the same data, but nil ReqC channel. -func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) { +func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error { valid := true // ls.Validators contains a list of one validator per chunk type. // if one validator succeeds, then the chunk is valid for _, v := range ls.Validators { - if valid = v.Validate(chunk.Addr, chunk.SData); valid { + if valid = v.Validate(chunk.Address(), chunk.Data()); valid { break } } if !valid { - log.Trace("invalid chunk", "addr", chunk.Addr, "len", len(chunk.SData)) - chunk.SetErrored(ErrChunkInvalid) - chunk.markAsStored() - return + return ErrChunkInvalid } - log.Trace("localstore.put", "addr", chunk.Addr) - + log.Trace("localstore.put", "key", chunk.Address()) ls.mu.Lock() defer ls.mu.Unlock() - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) - - memChunk, err := ls.memStore.Get(ctx, chunk.Addr) - switch err { - case nil: - if memChunk.ReqC == nil { - chunk.markAsStored() - return - } - case ErrChunkNotFound: - default: - chunk.SetErrored(err) - return + _, err := ls.memStore.Get(ctx, chunk.Address()) + if err == nil { + return nil } - - ls.DbStore.Put(ctx, chunk) - - // chunk is no longer a request, but a chunk with data, so replace it in memStore - newc := NewChunk(chunk.Addr, nil) - newc.SData = chunk.SData - newc.Size = chunk.Size - newc.dbStoredC = chunk.dbStoredC - - ls.memStore.Put(ctx, newc) - - if memChunk != nil && memChunk.ReqC != nil { - close(memChunk.ReqC) + if err != nil && err != ErrChunkNotFound { + return err } + ls.memStore.Put(ctx, chunk) + err = ls.DbStore.Put(ctx, chunk) + return err } // Get(chunk *Chunk) looks up a chunk in the local stores // This method is blocking until the chunk is retrieved // so additional timeout may be needed to wrap this call if // ChunkStores are remote and can have long latency -func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) { ls.mu.Lock() defer ls.mu.Unlock() return ls.get(ctx, addr) } -func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) { +func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) { chunk, err = ls.memStore.Get(ctx, addr) + + if err != nil && err != ErrChunkNotFound { + metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) + return nil, err + } + if err == nil { - if chunk.ReqC != nil { - select { - case <-chunk.ReqC: - default: - metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1) - return chunk, ErrFetching - } - } metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) - return + return chunk, nil } + metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1) chunk, err = ls.DbStore.Get(ctx, addr) if err != nil { metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1) - return + return nil, err } - chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) + ls.memStore.Put(ctx, chunk) - return + return chunk, nil } -// retrieve logic common for local and network chunk retrieval requests -func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1) - +func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error { ls.mu.Lock() defer ls.mu.Unlock() - var err error - chunk, err = ls.get(ctx, addr) - if err == nil && chunk.GetErrored() == nil { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr)) - return chunk, false + _, err := ls.get(ctx, addr) + if err == nil { + return nil } - if err == ErrFetching && chunk.GetErrored() == nil { - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC)) - return chunk, false + return func(context.Context) error { + return err } - // no data and no request status - metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1) - log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr)) - chunk = NewChunk(addr, make(chan bool)) - ls.memStore.Put(ctx, chunk) - return chunk, true } -// RequestsCacheLen returns the current number of outgoing requests stored in the cache -func (ls *LocalStore) RequestsCacheLen() int { - return ls.memStore.requests.Len() +func (ls *LocalStore) BinIndex(po uint8) uint64 { + return ls.DbStore.BinIndex(po) +} + +func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { + return ls.DbStore.SyncIterator(from, to, po, f) } // Close the local store diff --git a/swarm/storage/localstore_test.go b/swarm/storage/localstore_test.go index ae62218fe..814d270d3 100644 --- a/swarm/storage/localstore_test.go +++ b/swarm/storage/localstore_test.go @@ -17,11 +17,12 @@ package storage import ( + "context" "io/ioutil" "os" "testing" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" ) var ( @@ -50,29 +51,29 @@ func TestValidator(t *testing.T) { chunks := GenerateRandomChunks(259, 2) goodChunk := chunks[0] badChunk := chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs := putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk in spite of no validation, but got: %s", err) } - if err := badChunk.GetErrored(); err != nil { + if errs[1] != nil { t.Fatalf("expected no error on bad content address chunk in spite of no validation, but got: %s", err) } // add content address validator and check puts // bad should fail, good should pass store.Validators = append(store.Validators, NewContentAddressValidator(hashfunc)) - chunks = GenerateRandomChunks(chunk.DefaultSize, 2) + chunks = GenerateRandomChunks(ch.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs = putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err) } - if err := badChunk.GetErrored(); err == nil { + if errs[1] == nil { t.Fatal("expected error on bad content address chunk with content address validator only, but got nil") } @@ -81,16 +82,16 @@ func TestValidator(t *testing.T) { var negV boolTestValidator store.Validators = append(store.Validators, negV) - chunks = GenerateRandomChunks(chunk.DefaultSize, 2) + chunks = GenerateRandomChunks(ch.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs = putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err) } - if err := badChunk.GetErrored(); err == nil { + if errs[1] == nil { t.Fatal("expected error on bad content address chunk with content address validator only, but got nil") } @@ -99,18 +100,19 @@ func TestValidator(t *testing.T) { var posV boolTestValidator = true store.Validators = append(store.Validators, posV) - chunks = GenerateRandomChunks(chunk.DefaultSize, 2) + chunks = GenerateRandomChunks(ch.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] - copy(badChunk.SData, goodChunk.SData) + copy(badChunk.Data(), goodChunk.Data()) - PutChunks(store, goodChunk, badChunk) - if err := goodChunk.GetErrored(); err != nil { + errs = putChunks(store, goodChunk, badChunk) + if errs[0] != nil { t.Fatalf("expected no error on good content address chunk with content address validator only, but got: %s", err) } - if err := badChunk.GetErrored(); err != nil { - t.Fatalf("expected no error on bad content address chunk with content address validator only, but got: %s", err) + if errs[1] != nil { + t.Fatalf("expected no error on bad content address chunk in spite of no validation, but got: %s", err) } + } type boolTestValidator bool @@ -118,3 +120,27 @@ type boolTestValidator bool func (self boolTestValidator) Validate(addr Address, data []byte) bool { return bool(self) } + +// putChunks adds chunks to localstore +// It waits for receive on the stored channel +// It logs but does not fail on delivery error +func putChunks(store *LocalStore, chunks ...Chunk) []error { + i := 0 + f := func(n int64) Chunk { + chunk := chunks[i] + i++ + return chunk + } + _, errs := put(store, len(chunks), f) + return errs +} + +func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs []error) { + for i := int64(0); i < int64(n); i++ { + chunk := f(ch.DefaultSize) + err := store.Put(context.TODO(), chunk) + errs = append(errs, err) + hs = append(hs, chunk.Address()) + } + return hs, errs +} diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 55cfcbfea..36b1e00d9 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -20,24 +20,17 @@ package storage import ( "context" - "sync" lru "github.com/hashicorp/golang-lru" ) type MemStore struct { cache *lru.Cache - requests *lru.Cache - mu sync.RWMutex disabled bool } -//NewMemStore is instantiating a MemStore cache. We are keeping a record of all outgoing requests for chunks, that -//should later be delivered by peer nodes, in the `requests` LRU cache. We are also keeping all frequently requested +//NewMemStore is instantiating a MemStore cache keeping all frequently requested //chunks in the `cache` LRU cache. -// -//`requests` LRU cache capacity should ideally never be reached, this is why for the time being it should be initialised -//with the same value as the LDBStore capacity. func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { if params.CacheCapacity == 0 { return &MemStore{ @@ -45,102 +38,48 @@ func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { } } - onEvicted := func(key interface{}, value interface{}) { - v := value.(*Chunk) - <-v.dbStoredC - } - c, err := lru.NewWithEvict(int(params.CacheCapacity), onEvicted) - if err != nil { - panic(err) - } - - requestEvicted := func(key interface{}, value interface{}) { - // temporary remove of the error log, until we figure out the problem, as it is too spamy - //log.Error("evict called on outgoing request") - } - r, err := lru.NewWithEvict(int(params.ChunkRequestsCacheCapacity), requestEvicted) + c, err := lru.New(int(params.CacheCapacity)) if err != nil { panic(err) } return &MemStore{ - cache: c, - requests: r, + cache: c, } } -func (m *MemStore) Get(ctx context.Context, addr Address) (*Chunk, error) { +func (m *MemStore) Get(_ context.Context, addr Address) (Chunk, error) { if m.disabled { return nil, ErrChunkNotFound } - m.mu.RLock() - defer m.mu.RUnlock() - - r, ok := m.requests.Get(string(addr)) - // it is a request - if ok { - return r.(*Chunk), nil - } - - // it is not a request c, ok := m.cache.Get(string(addr)) if !ok { return nil, ErrChunkNotFound } - return c.(*Chunk), nil + return c.(*chunk), nil } -func (m *MemStore) Put(ctx context.Context, c *Chunk) { +func (m *MemStore) Put(_ context.Context, c Chunk) error { if m.disabled { - return + return nil } - m.mu.Lock() - defer m.mu.Unlock() - - // it is a request - if c.ReqC != nil { - select { - case <-c.ReqC: - if c.GetErrored() != nil { - m.requests.Remove(string(c.Addr)) - return - } - m.cache.Add(string(c.Addr), c) - m.requests.Remove(string(c.Addr)) - default: - m.requests.Add(string(c.Addr), c) - } - return - } - - // it is not a request - m.cache.Add(string(c.Addr), c) - m.requests.Remove(string(c.Addr)) + m.cache.Add(string(c.Address()), c) + return nil } func (m *MemStore) setCapacity(n int) { if n <= 0 { m.disabled = true } else { - onEvicted := func(key interface{}, value interface{}) { - v := value.(*Chunk) - <-v.dbStoredC - } - c, err := lru.NewWithEvict(n, onEvicted) - if err != nil { - panic(err) - } - - r, err := lru.New(defaultChunkRequestsCacheCapacity) + c, err := lru.New(n) if err != nil { panic(err) } - m = &MemStore{ - cache: c, - requests: r, + *m = MemStore{ + cache: c, } } } diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go index 2c1b0e89e..6b370d2b4 100644 --- a/swarm/storage/memstore_test.go +++ b/swarm/storage/memstore_test.go @@ -18,11 +18,6 @@ package storage import ( "context" - "crypto/rand" - "encoding/binary" - "io/ioutil" - "os" - "sync" "testing" "github.com/ethereum/go-ethereum/swarm/log" @@ -33,40 +28,32 @@ func newTestMemStore() *MemStore { return NewMemStore(storeparams, nil) } -func testMemStoreRandom(n int, processors int, chunksize int64, t *testing.T) { +func testMemStoreRandom(n int, chunksize int64, t *testing.T) { m := newTestMemStore() defer m.Close() - testStoreRandom(m, processors, n, chunksize, t) + testStoreRandom(m, n, chunksize, t) } -func testMemStoreCorrect(n int, processors int, chunksize int64, t *testing.T) { +func testMemStoreCorrect(n int, chunksize int64, t *testing.T) { m := newTestMemStore() defer m.Close() - testStoreCorrect(m, processors, n, chunksize, t) + testStoreCorrect(m, n, chunksize, t) } func TestMemStoreRandom_1(t *testing.T) { - testMemStoreRandom(1, 1, 0, t) + testMemStoreRandom(1, 0, t) } func TestMemStoreCorrect_1(t *testing.T) { - testMemStoreCorrect(1, 1, 4104, t) + testMemStoreCorrect(1, 4104, t) } -func TestMemStoreRandom_1_1k(t *testing.T) { - testMemStoreRandom(1, 1000, 0, t) +func TestMemStoreRandom_1k(t *testing.T) { + testMemStoreRandom(1000, 0, t) } -func TestMemStoreCorrect_1_1k(t *testing.T) { - testMemStoreCorrect(1, 100, 4096, t) -} - -func TestMemStoreRandom_8_1k(t *testing.T) { - testMemStoreRandom(8, 1000, 0, t) -} - -func TestMemStoreCorrect_8_1k(t *testing.T) { - testMemStoreCorrect(8, 1000, 4096, t) +func TestMemStoreCorrect_1k(t *testing.T) { + testMemStoreCorrect(100, 4096, t) } func TestMemStoreNotFound(t *testing.T) { @@ -82,13 +69,13 @@ func TestMemStoreNotFound(t *testing.T) { func benchmarkMemStorePut(n int, processors int, chunksize int64, b *testing.B) { m := newTestMemStore() defer m.Close() - benchmarkStorePut(m, processors, n, chunksize, b) + benchmarkStorePut(m, n, chunksize, b) } func benchmarkMemStoreGet(n int, processors int, chunksize int64, b *testing.B) { m := newTestMemStore() defer m.Close() - benchmarkStoreGet(m, processors, n, chunksize, b) + benchmarkStoreGet(m, n, chunksize, b) } func BenchmarkMemStorePut_1_500(b *testing.B) { @@ -107,104 +94,70 @@ func BenchmarkMemStoreGet_8_500(b *testing.B) { benchmarkMemStoreGet(500, 8, 4096, b) } -func newLDBStore(t *testing.T) (*LDBStore, func()) { - dir, err := ioutil.TempDir("", "bzz-storage-test") - if err != nil { - t.Fatal(err) - } - log.Trace("memstore.tempdir", "dir", dir) - - ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir) - db, err := NewLDBStore(ldbparams) - if err != nil { - t.Fatal(err) - } - - cleanup := func() { - db.Close() - err := os.RemoveAll(dir) - if err != nil { - t.Fatal(err) - } - } - - return db, cleanup -} - func TestMemStoreAndLDBStore(t *testing.T) { ldb, cleanup := newLDBStore(t) ldb.setCapacity(4000) defer cleanup() cacheCap := 200 - requestsCap := 200 - memStore := NewMemStore(NewStoreParams(4000, 200, 200, nil, nil), nil) + memStore := NewMemStore(NewStoreParams(4000, 200, nil, nil), nil) tests := []struct { - n int // number of chunks to push to memStore - chunkSize uint64 // size of chunk (by default in Swarm - 4096) - request bool // whether or not to set the ReqC channel on the random chunks + n int // number of chunks to push to memStore + chunkSize int64 // size of chunk (by default in Swarm - 4096) }{ { n: 1, chunkSize: 4096, - request: false, }, { n: 201, chunkSize: 4096, - request: false, }, { n: 501, chunkSize: 4096, - request: false, }, { n: 3100, chunkSize: 4096, - request: false, }, { n: 100, chunkSize: 4096, - request: true, }, } for i, tt := range tests { log.Info("running test", "idx", i, "tt", tt) - var chunks []*Chunk + var chunks []Chunk for i := 0; i < tt.n; i++ { - var c *Chunk - if tt.request { - c = NewRandomRequestChunk(tt.chunkSize) - } else { - c = NewRandomChunk(tt.chunkSize) - } - + c := GenerateRandomChunk(tt.chunkSize) chunks = append(chunks, c) } for i := 0; i < tt.n; i++ { - go ldb.Put(context.TODO(), chunks[i]) - memStore.Put(context.TODO(), chunks[i]) + err := ldb.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatal(err) + } + err = memStore.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatal(err) + } if got := memStore.cache.Len(); got > cacheCap { t.Fatalf("expected to get cache capacity less than %v, but got %v", cacheCap, got) } - if got := memStore.requests.Len(); got > requestsCap { - t.Fatalf("expected to get requests capacity less than %v, but got %v", requestsCap, got) - } } for i := 0; i < tt.n; i++ { - _, err := memStore.Get(context.TODO(), chunks[i].Addr) + _, err := memStore.Get(context.TODO(), chunks[i].Address()) if err != nil { if err == ErrChunkNotFound { - _, err := ldb.Get(context.TODO(), chunks[i].Addr) + _, err := ldb.Get(context.TODO(), chunks[i].Address()) if err != nil { t.Fatalf("couldn't get chunk %v from ldb, got error: %v", i, err) } @@ -213,37 +166,5 @@ func TestMemStoreAndLDBStore(t *testing.T) { } } } - - // wait for all chunks to be stored before ending the test are cleaning up - for i := 0; i < tt.n; i++ { - <-chunks[i].dbStoredC - } - } -} - -func NewRandomChunk(chunkSize uint64) *Chunk { - c := &Chunk{ - Addr: make([]byte, 32), - ReqC: nil, - SData: make([]byte, chunkSize+8), // SData should be chunkSize + 8 bytes reserved for length - dbStoredC: make(chan bool), - dbStoredMu: &sync.Mutex{}, } - - rand.Read(c.SData) - - binary.LittleEndian.PutUint64(c.SData[:8], chunkSize) - - hasher := MakeHashFunc(SHA3Hash)() - hasher.Write(c.SData) - copy(c.Addr, hasher.Sum(nil)) - - return c -} - -func NewRandomRequestChunk(chunkSize uint64) *Chunk { - c := NewRandomChunk(chunkSize) - c.ReqC = make(chan bool) - - return c } diff --git a/swarm/storage/mru/handler.go b/swarm/storage/mru/handler.go index 57561fd14..18c667f14 100644 --- a/swarm/storage/mru/handler.go +++ b/swarm/storage/mru/handler.go @@ -187,12 +187,12 @@ func (h *Handler) New(ctx context.Context, request *Request) error { return err } if request.metaHash != nil && !bytes.Equal(request.metaHash, metaHash) || - request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Addr) { + request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Address()) { return NewError(ErrInvalidValue, "metaHash in UpdateRequest does not match actual metadata") } request.metaHash = metaHash - request.rootAddr = chunk.Addr + request.rootAddr = chunk.Address() h.chunkStore.Put(ctx, chunk) log.Debug("new resource", "name", request.metadata.Name, "startTime", request.metadata.StartTime, "frequency", request.metadata.Frequency, "owner", request.metadata.Owner) @@ -202,14 +202,14 @@ func (h *Handler) New(ctx context.Context, request *Request) error { resourceUpdate: resourceUpdate{ updateHeader: updateHeader{ UpdateLookup: UpdateLookup{ - rootAddr: chunk.Addr, + rootAddr: chunk.Address(), }, }, }, ResourceMetadata: request.metadata, updated: time.Now(), } - h.set(chunk.Addr, rsrc) + h.set(chunk.Address(), rsrc) return nil } @@ -348,7 +348,11 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error return nil, NewErrorf(ErrPeriodDepth, "Lookup exceeded max period hops (%d)", lp.Limit) } updateAddr := lp.UpdateAddr() - chunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) + defer cancel() + + chunk, err := h.chunkStore.Get(ctx, updateAddr) if err == nil { if specificversion { return h.updateIndex(rsrc, chunk) @@ -358,7 +362,11 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error for { newversion := lp.version + 1 updateAddr := lp.UpdateAddr() - newchunk, err := h.chunkStore.GetWithTimeout(context.TODO(), updateAddr, defaultRetrieveTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) + defer cancel() + + newchunk, err := h.chunkStore.Get(ctx, updateAddr) if err != nil { return h.updateIndex(rsrc, chunk) } @@ -380,7 +388,10 @@ func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error // Load retrieves the Mutable Resource metadata chunk stored at rootAddr // Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource, error) { - chunk, err := h.chunkStore.GetWithTimeout(ctx, rootAddr, defaultRetrieveTimeout) + //TODO: Maybe add timeout to context, defaultRetrieveTimeout? + ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) + defer cancel() + chunk, err := h.chunkStore.Get(ctx, rootAddr) if err != nil { return nil, NewError(ErrNotFound, err.Error()) } @@ -388,11 +399,11 @@ func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource // create the index entry rsrc := &resource{} - if err := rsrc.ResourceMetadata.binaryGet(chunk.SData); err != nil { // Will fail if this is not really a metadata chunk + if err := rsrc.ResourceMetadata.binaryGet(chunk.Data()); err != nil { // Will fail if this is not really a metadata chunk return nil, err } - rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.SData) + rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.Data()) if !bytes.Equal(rsrc.rootAddr, rootAddr) { return nil, NewError(ErrCorruptData, "Corrupt metadata chunk") } @@ -402,17 +413,17 @@ func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource } // update mutable resource index map with specified content -func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, error) { +func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, error) { // retrieve metadata from chunk data and check that it matches this mutable resource var r SignedResourceUpdate - if err := r.fromChunk(chunk.Addr, chunk.SData); err != nil { + if err := r.fromChunk(chunk.Address(), chunk.Data()); err != nil { return nil, err } - log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Addr, "period", r.period, "version", r.version) + log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Address(), "period", r.period, "version", r.version) // update our rsrcs entry map - rsrc.lastKey = chunk.Addr + rsrc.lastKey = chunk.Address() rsrc.period = r.period rsrc.version = r.version rsrc.updated = time.Now() @@ -420,8 +431,8 @@ func (h *Handler) updateIndex(rsrc *resource, chunk *storage.Chunk) (*resource, rsrc.multihash = r.multihash copy(rsrc.data, r.data) rsrc.Reader = bytes.NewReader(rsrc.data) - log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Addr, "period", rsrc.period, "version", rsrc.version) - h.set(chunk.Addr, rsrc) + log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Address(), "period", rsrc.period, "version", rsrc.version) + h.set(chunk.Address(), rsrc) return rsrc, nil } @@ -457,7 +468,7 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // send the chunk h.chunkStore.Put(ctx, chunk) - log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.SData, "multihash", r.multihash) + log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.Data(), "multihash", r.multihash) // update our resources map entry if the new update is older than the one we have, if we have it. if rsrc != nil && (r.period > rsrc.period || (rsrc.period == r.period && r.version > rsrc.version)) { @@ -475,7 +486,7 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // Retrieves the resource index value for the given nameHash func (h *Handler) get(rootAddr storage.Address) *resource { - if len(rootAddr) < storage.KeyLength { + if len(rootAddr) < storage.AddressLength { log.Warn("Handler.get with invalid rootAddr") return nil } @@ -488,7 +499,7 @@ func (h *Handler) get(rootAddr storage.Address) *resource { // Sets the resource index value for the given nameHash func (h *Handler) set(rootAddr storage.Address, rsrc *resource) { - if len(rootAddr) < storage.KeyLength { + if len(rootAddr) < storage.AddressLength { log.Warn("Handler.set with invalid rootAddr") return } diff --git a/swarm/storage/mru/lookup.go b/swarm/storage/mru/lookup.go index eb28336e1..b52cd5b4f 100644 --- a/swarm/storage/mru/lookup.go +++ b/swarm/storage/mru/lookup.go @@ -72,7 +72,7 @@ type UpdateLookup struct { // 4 bytes period // 4 bytes version // storage.Keylength for rootAddr -const updateLookupLength = 4 + 4 + storage.KeyLength +const updateLookupLength = 4 + 4 + storage.AddressLength // UpdateAddr calculates the resource update chunk address corresponding to this lookup key func (u *UpdateLookup) UpdateAddr() (updateAddr storage.Address) { @@ -90,7 +90,7 @@ func (u *UpdateLookup) binaryPut(serializedData []byte) error { if len(serializedData) != updateLookupLength { return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize UpdateLookup. Expected %d, got %d", updateLookupLength, len(serializedData)) } - if len(u.rootAddr) != storage.KeyLength { + if len(u.rootAddr) != storage.AddressLength { return NewError(ErrInvalidValue, "UpdateLookup.binaryPut called without rootAddr set") } binary.LittleEndian.PutUint32(serializedData[:4], u.period) @@ -111,7 +111,7 @@ func (u *UpdateLookup) binaryGet(serializedData []byte) error { } u.period = binary.LittleEndian.Uint32(serializedData[:4]) u.version = binary.LittleEndian.Uint32(serializedData[4:8]) - u.rootAddr = storage.Address(make([]byte, storage.KeyLength)) + u.rootAddr = storage.Address(make([]byte, storage.AddressLength)) copy(u.rootAddr[:], serializedData[8:]) return nil } diff --git a/swarm/storage/mru/metadata.go b/swarm/storage/mru/metadata.go index 0ab0ed1d9..509114895 100644 --- a/swarm/storage/mru/metadata.go +++ b/swarm/storage/mru/metadata.go @@ -142,7 +142,7 @@ func (r *ResourceMetadata) serializeAndHash() (rootAddr, metaHash []byte, chunkD } // creates a metadata chunk out of a resourceMetadata structure -func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []byte, err error) { +func (metadata *ResourceMetadata) newChunk() (chunk storage.Chunk, metaHash []byte, err error) { // the metadata chunk contains a timestamp of when the resource starts to be valid // and also how frequently it is expected to be updated // from this we know at what time we should look for updates, and how often @@ -157,9 +157,7 @@ func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []b } // make the chunk and send it to swarm - chunk = storage.NewChunk(rootAddr, nil) - chunk.SData = chunkData - chunk.Size = int64(len(chunkData)) + chunk = storage.NewChunk(rootAddr, chunkData) return chunk, metaHash, nil } diff --git a/swarm/storage/mru/request.go b/swarm/storage/mru/request.go index dd71f855d..af2ccf5c7 100644 --- a/swarm/storage/mru/request.go +++ b/swarm/storage/mru/request.go @@ -182,7 +182,7 @@ func (r *Request) fromJSON(j *updateRequestJSON) error { var declaredRootAddr storage.Address var declaredMetaHash []byte - declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.KeyLength, "rootAddr") + declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.AddressLength, "rootAddr") if err != nil { return err } diff --git a/swarm/storage/mru/resource_test.go b/swarm/storage/mru/resource_test.go index 76d7c58a1..0fb465bb0 100644 --- a/swarm/storage/mru/resource_test.go +++ b/swarm/storage/mru/resource_test.go @@ -87,8 +87,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { resourceUpdate: resourceUpdate{ updateHeader: updateHeader{ UpdateLookup: UpdateLookup{ - - rootAddr: make([]byte, 79), // put the wrong length, should be storage.KeyLength + rootAddr: make([]byte, 79), // put the wrong length, should be storage.AddressLength }, metaHash: nil, multihash: false, @@ -99,8 +98,8 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { if err == nil { t.Fatal("Expected newUpdateChunk to fail when rootAddr or metaHash have the wrong length") } - r.rootAddr = make([]byte, storage.KeyLength) - r.metaHash = make([]byte, storage.KeyLength) + r.rootAddr = make([]byte, storage.AddressLength) + r.metaHash = make([]byte, storage.AddressLength) _, err = r.toChunk() if err == nil { t.Fatal("Expected newUpdateChunk to fail when there is no data") @@ -197,7 +196,7 @@ func TestReverse(t *testing.T) { // check that we can recover the owner account from the update chunk's signature var checkUpdate SignedResourceUpdate - if err := checkUpdate.fromChunk(chunk.Addr, chunk.SData); err != nil { + if err := checkUpdate.fromChunk(chunk.Address(), chunk.Data()); err != nil { t.Fatal(err) } checkdigest, err := checkUpdate.GetDigest() @@ -215,8 +214,8 @@ func TestReverse(t *testing.T) { t.Fatalf("addresses dont match: %x != %x", originaladdress, recoveredaddress) } - if !bytes.Equal(key[:], chunk.Addr[:]) { - t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Addr) + if !bytes.Equal(key[:], chunk.Address()[:]) { + t.Fatalf("Expected chunk key '%x', was '%x'", key, chunk.Address()) } if period != checkUpdate.period { t.Fatalf("Expected period '%d', was '%d'", period, checkUpdate.period) @@ -270,16 +269,16 @@ func TestResourceHandler(t *testing.T) { t.Fatal(err) } - chunk, err := rh.chunkStore.Get(context.TODO(), storage.Address(request.rootAddr)) + chunk, err := rh.chunkStore.Get(ctx, storage.Address(request.rootAddr)) if err != nil { t.Fatal(err) - } else if len(chunk.SData) < 16 { - t.Fatalf("chunk data must be minimum 16 bytes, is %d", len(chunk.SData)) + } else if len(chunk.Data()) < 16 { + t.Fatalf("chunk data must be minimum 16 bytes, is %d", len(chunk.Data())) } var recoveredMetadata ResourceMetadata - recoveredMetadata.binaryGet(chunk.SData) + recoveredMetadata.binaryGet(chunk.Data()) if err != nil { t.Fatal(err) } @@ -704,7 +703,7 @@ func TestValidator(t *testing.T) { if err != nil { t.Fatal(err) } - if !rh.Validate(chunk.Addr, chunk.SData) { + if !rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator fail on update chunk") } @@ -724,7 +723,7 @@ func TestValidator(t *testing.T) { t.Fatal(err) } - if rh.Validate(chunk.Addr, chunk.SData) { + if rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator did not fail on update chunk with false address") } @@ -742,7 +741,7 @@ func TestValidator(t *testing.T) { t.Fatal(err) } - if !rh.Validate(chunk.Addr, chunk.SData) { + if !rh.Validate(chunk.Address(), chunk.Data()) { t.Fatal("Chunk validator fail on metadata chunk") } } @@ -783,8 +782,7 @@ func TestValidatorInStore(t *testing.T) { // create content addressed chunks, one good, one faulty chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2) goodChunk := chunks[0] - badChunk := chunks[1] - badChunk.SData = goodChunk.SData + badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data()) metadata := &ResourceMetadata{ StartTime: startTime, @@ -801,7 +799,7 @@ func TestValidatorInStore(t *testing.T) { updateLookup := UpdateLookup{ period: 42, version: 1, - rootAddr: rootChunk.Addr, + rootAddr: rootChunk.Address(), } updateAddr := updateLookup.UpdateAddr() @@ -826,16 +824,16 @@ func TestValidatorInStore(t *testing.T) { } // put the chunks in the store and check their error status - storage.PutChunks(store, goodChunk) - if goodChunk.GetErrored() == nil { + err = store.Put(context.Background(), goodChunk) + if err == nil { t.Fatal("expected error on good content address chunk with resource validator only, but got nil") } - storage.PutChunks(store, badChunk) - if badChunk.GetErrored() == nil { + err = store.Put(context.Background(), badChunk) + if err == nil { t.Fatal("expected error on bad content address chunk with resource validator only, but got nil") } - storage.PutChunks(store, uglyChunk) - if err := uglyChunk.GetErrored(); err != nil { + err = store.Put(context.Background(), uglyChunk) + if err != nil { t.Fatalf("expected no error on resource update chunk with resource validator only, but got: %s", err) } } @@ -897,7 +895,7 @@ func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { return nil, err } var r SignedResourceUpdate - if err := r.fromChunk(addr, chunk.SData); err != nil { + if err := r.fromChunk(addr, chunk.Data()); err != nil { return nil, err } return r.data, nil diff --git a/swarm/storage/mru/signedupdate.go b/swarm/storage/mru/signedupdate.go index 1c6d02e82..41a5a5e63 100644 --- a/swarm/storage/mru/signedupdate.go +++ b/swarm/storage/mru/signedupdate.go @@ -96,7 +96,7 @@ func (r *SignedResourceUpdate) Sign(signer Signer) error { } // create an update chunk. -func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) { +func (r *SignedResourceUpdate) toChunk() (storage.Chunk, error) { // Check that the update is signed and serialized // For efficiency, data is serialized during signature and cached in @@ -105,14 +105,11 @@ func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) { return nil, NewError(ErrInvalidSignature, "newUpdateChunk called without a valid signature or payload data. Call .Sign() first.") } - chunk := storage.NewChunk(r.updateAddr, nil) resourceUpdateLength := r.resourceUpdate.binaryLength() - chunk.SData = r.binaryData - // signature is the last item in the chunk data - copy(chunk.SData[resourceUpdateLength:], r.signature[:]) + copy(r.binaryData[resourceUpdateLength:], r.signature[:]) - chunk.Size = int64(len(chunk.SData)) + chunk := storage.NewChunk(r.updateAddr, r.binaryData) return chunk, nil } diff --git a/swarm/storage/mru/testutil.go b/swarm/storage/mru/testutil.go index 6efcba9ab..a30baaa1d 100644 --- a/swarm/storage/mru/testutil.go +++ b/swarm/storage/mru/testutil.go @@ -17,8 +17,12 @@ package mru import ( + "context" "fmt" "path/filepath" + "sync" + + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -35,6 +39,17 @@ func (t *TestHandler) Close() { t.chunkStore.Close() } +type mockNetFetcher struct{} + +func (m *mockNetFetcher) Request(ctx context.Context) { +} +func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) { +} + +func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher { + return &mockNetFetcher{} +} + // NewTestHandler creates Handler object to be used for testing purposes. func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) { path := filepath.Join(datadir, testDbDirName) @@ -47,7 +62,11 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) } localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(resourceHashAlgorithm))) localStore.Validators = append(localStore.Validators, rh) - netStore := storage.NewNetStore(localStore, nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, err + } + netStore.NewNetFetcherFunc = newFakeNetFetcher rh.SetStore(netStore) return &TestHandler{rh}, nil } diff --git a/swarm/storage/mru/updateheader.go b/swarm/storage/mru/updateheader.go index 3ac20c189..f0039eaf6 100644 --- a/swarm/storage/mru/updateheader.go +++ b/swarm/storage/mru/updateheader.go @@ -27,7 +27,7 @@ type updateHeader struct { metaHash []byte // SHA3 hash of the metadata chunk (less ownerAddr). Used to prove ownerhsip of the resource. } -const metaHashLength = storage.KeyLength +const metaHashLength = storage.AddressLength // updateLookupLength bytes // 1 byte flags (multihash bool for now) @@ -76,7 +76,7 @@ func (h *updateHeader) binaryGet(serializedData []byte) error { } cursor := updateLookupLength h.metaHash = make([]byte, metaHashLength) - copy(h.metaHash[:storage.KeyLength], serializedData[cursor:cursor+storage.KeyLength]) + copy(h.metaHash[:storage.AddressLength], serializedData[cursor:cursor+storage.AddressLength]) cursor += metaHashLength flags := serializedData[cursor] diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 96a7e51f7..de2d82d2b 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -18,181 +18,275 @@ package storage import ( "context" + "encoding/hex" + "fmt" + "sync" + "sync/atomic" "time" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/spancontext" - opentracing "github.com/opentracing/opentracing-go" + + lru "github.com/hashicorp/golang-lru" ) -var ( - // NetStore.Get timeout for get and get retries - // This is the maximum period that the Get will block. - // If it is reached, Get will return ErrChunkNotFound. - netStoreRetryTimeout = 30 * time.Second - // Minimal period between calling get method on NetStore - // on retry. It protects calling get very frequently if - // it returns ErrChunkNotFound very fast. - netStoreMinRetryDelay = 3 * time.Second - // Timeout interval before retrieval is timed out. - // It is used in NetStore.get on waiting for ReqC to be - // closed on a single retrieve request. - searchTimeout = 10 * time.Second +type ( + NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher ) -// NetStore implements the ChunkStore interface, -// this chunk access layer assumed 2 chunk stores -// local storage eg. LocalStore and network storage eg., NetStore -// access by calling network is blocking with a timeout +type NetFetcher interface { + Request(ctx context.Context) + Offer(ctx context.Context, source *discover.NodeID) +} + +// NetStore is an extension of local storage +// it implements the ChunkStore interface +// on request it initiates remote cloud retrieval using a fetcher +// fetchers are unique to a chunk and are stored in fetchers LRU memory cache +// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address type NetStore struct { - localStore *LocalStore - retrieve func(ctx context.Context, chunk *Chunk) error + mu sync.Mutex + store SyncChunkStore + fetchers *lru.Cache + NewNetFetcherFunc NewNetFetcherFunc + closeC chan struct{} } -func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore { - return &NetStore{localStore, retrieve} +// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a +// constructor function that can create a fetch function for a specific chunk address. +func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) { + fetchers, err := lru.New(defaultChunkRequestsCacheCapacity) + if err != nil { + return nil, err + } + return &NetStore{ + store: store, + fetchers: fetchers, + NewNetFetcherFunc: nnf, + closeC: make(chan struct{}), + }, nil } -// Get is the entrypoint for local retrieve requests -// waits for response or times out -// -// Get uses get method to retrieve request, but retries if the -// ErrChunkNotFound is returned by get, until the netStoreRetryTimeout -// is reached. -func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) { - - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "netstore.get.global") - defer sp.Finish() - - timer := time.NewTimer(netStoreRetryTimeout) - defer timer.Stop() - - // result and resultC provide results from the goroutine - // where NetStore.get is called. - type result struct { - chunk *Chunk - err error +// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in +// the fetchers cache +func (n *NetStore) Put(ctx context.Context, ch Chunk) error { + n.mu.Lock() + defer n.mu.Unlock() + + // put to the chunk to the store, there should be no error + err := n.store.Put(ctx, ch) + if err != nil { + return err } - resultC := make(chan result) - - // quitC ensures that retring goroutine is terminated - // when this function returns. - quitC := make(chan struct{}) - defer close(quitC) - - // do retries in a goroutine so that the timer can - // force this method to return after the netStoreRetryTimeout. - go func() { - // limiter ensures that NetStore.get is not called more frequently - // then netStoreMinRetryDelay. If NetStore.get takes longer - // then netStoreMinRetryDelay, the next retry call will be - // without a delay. - limiter := time.NewTimer(netStoreMinRetryDelay) - defer limiter.Stop() - - for { - chunk, err := ns.get(ctx, addr, 0) - if err != ErrChunkNotFound { - // break retry only if the error is nil - // or other error then ErrChunkNotFound - select { - case <-quitC: - // Maybe NetStore.Get function has returned - // by the timer.C while we were waiting for the - // results. Terminate this goroutine. - case resultC <- result{chunk: chunk, err: err}: - // Send the result to the parrent goroutine. - } - return - - } - select { - case <-quitC: - // NetStore.Get function has returned, possibly - // by the timer.C, which makes this goroutine - // not needed. - return - case <-limiter.C: - } - // Reset the limiter for the next iteration. - limiter.Reset(netStoreMinRetryDelay) - log.Debug("NetStore.Get retry chunk", "key", addr) - } - }() - select { - case r := <-resultC: - return r.chunk, r.err - case <-timer.C: - return nil, ErrChunkNotFound + // if chunk is now put in the store, check if there was an active fetcher and call deliver on it + // (this delivers the chunk to requestors via the fetcher) + if f := n.getFetcher(ch.Address()); f != nil { + f.deliver(ctx, ch) + } + return nil +} + +// Get retrieves the chunk from the NetStore DPA synchronously. +// It calls NetStore.get, and if the chunk is not in local Storage +// it calls fetch with the request, which blocks until the chunk +// arrived or context is done +func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) { + chunk, fetch, err := n.get(rctx, ref) + if err != nil { + return nil, err + } + if chunk != nil { + return chunk, nil } + return fetch(rctx) } -// GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter -func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { - return ns.get(ctx, addr, timeout) +func (n *NetStore) BinIndex(po uint8) uint64 { + return n.store.BinIndex(po) } -func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) { - if timeout == 0 { - timeout = searchTimeout +func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error { + return n.store.Iterator(from, to, po, f) +} + +// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function, +// which returns after the chunk is available or the context is done +func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error { + chunk, fetch, _ := n.get(ctx, ref) + if chunk != nil { + return nil + } + return func(ctx context.Context) error { + _, err := fetch(ctx) + return err } +} - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "netstore.get") - defer sp.Finish() +// Close chunk store +func (n *NetStore) Close() { + close(n.closeC) + n.store.Close() + // TODO: loop through fetchers to cancel them +} - if ns.retrieve == nil { - chunk, err = ns.localStore.Get(ctx, addr) - if err == nil { - return chunk, nil - } - if err != ErrFetching { - return nil, err - } - } else { - var created bool - chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr) +// get attempts at retrieving the chunk from LocalStore +// If it is not found then using getOrCreateFetcher: +// 1. Either there is already a fetcher to retrieve it +// 2. A new fetcher is created and saved in the fetchers cache +// From here on, all Get will hit on this fetcher until the chunk is delivered +// or all fetcher contexts are done. +// It returns a chunk, a fetcher function and an error +// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk. +func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) { + n.mu.Lock() + defer n.mu.Unlock() - if chunk.ReqC == nil { - return chunk, nil + chunk, err := n.store.Get(ctx, ref) + if err != nil { + if err != ErrChunkNotFound { + log.Debug("Received error from LocalStore other than ErrNotFound", "err", err) } + // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one + // if it doesn't exist yet + f := n.getOrCreateFetcher(ref) + // If the caller needs the chunk, it has to use the returned fetch function to get it + return nil, f.Fetch, nil + } - if created { - err := ns.retrieve(ctx, chunk) - if err != nil { - // mark chunk request as failed so that we can retry it later - chunk.SetErrored(ErrChunkUnavailable) - return nil, err - } - } + return chunk, nil, nil +} + +// getOrCreateFetcher attempts at retrieving an existing fetchers +// if none exists, creates one and saves it in the fetchers cache +// caller must hold the lock +func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher { + if f := n.getFetcher(ref); f != nil { + return f } - t := time.NewTicker(timeout) - defer t.Stop() + // no fetcher for the given address, we have to create a new one + key := hex.EncodeToString(ref) + // create the context during which fetching is kept alive + ctx, cancel := context.WithCancel(context.Background()) + // destroy is called when all requests finish + destroy := func() { + // remove fetcher from fetchers + n.fetchers.Remove(key) + // stop fetcher by cancelling context called when + // all requests cancelled/timedout or chunk is delivered + cancel() + } + // peers always stores all the peers which have an active request for the chunk. It is shared + // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because + // the peers which requested the chunk should not be requested to deliver it. + peers := &sync.Map{} - select { - case <-t.C: - // mark chunk request as failed so that we can retry - chunk.SetErrored(ErrChunkNotFound) - return nil, ErrChunkNotFound - case <-chunk.ReqC: + fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC) + n.fetchers.Add(key, fetcher) + + return fetcher +} + +// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists, +// otherwise it returns nil +func (n *NetStore) getFetcher(ref Address) *fetcher { + key := hex.EncodeToString(ref) + f, ok := n.fetchers.Get(key) + if ok { + return f.(*fetcher) } - chunk.SetErrored(nil) - return chunk, nil + return nil } -// Put is the entrypoint for local store requests coming from storeLoop -func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) { - ns.localStore.Put(ctx, chunk) +// RequestsCacheLen returns the current number of outgoing requests stored in the cache +func (n *NetStore) RequestsCacheLen() int { + return n.fetchers.Len() } -// Close chunk store -func (ns *NetStore) Close() { - ns.localStore.Close() +// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the +// peers who have requested it and did not receive it yet. +type fetcher struct { + addr Address // address of chunk + chunk Chunk // fetcher can set the chunk on the fetcher + deliveredC chan struct{} // chan signalling chunk delivery to requests + cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore) + netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context + cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called + peers *sync.Map // the peers which asked for the chunk + requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called + deliverOnce *sync.Once // guarantees that we only close deliveredC once +} + +// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually +// does the retrieval (in non-test cases this is coming from the network package). cancel function is +// called either +// 1. when the chunk has been fetched all peers have been either notified or their context has been done +// 2. the chunk has not been fetched but all context from all the requests has been done +// The peers map stores all the peers which have requested chunk. +func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { + cancelOnce := &sync.Once{} // cancel should only be called once + return &fetcher{ + addr: addr, + deliveredC: make(chan struct{}), + deliverOnce: &sync.Once{}, + cancelledC: closeC, + netFetcher: nf, + cancel: func() { + cancelOnce.Do(func() { + cancel() + }) + }, + peers: peers, + } +} + +// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available +// locally. +func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { + atomic.AddInt32(&f.requestCnt, 1) + defer func() { + // if all the requests are done the fetcher can be cancelled + if atomic.AddInt32(&f.requestCnt, -1) == 0 { + f.cancel() + } + }() + + // The peer asking for the chunk. Store in the shared peers map, but delete after the request + // has been delivered + peer := rctx.Value("peer") + if peer != nil { + f.peers.Store(peer, time.Now()) + defer f.peers.Delete(peer) + } + + // If there is a source in the context then it is an offer, otherwise a request + sourceIF := rctx.Value("source") + if sourceIF != nil { + var source *discover.NodeID + id := discover.MustHexID(sourceIF.(string)) + source = &id + f.netFetcher.Offer(rctx, source) + } else { + f.netFetcher.Request(rctx) + } + + // wait until either the chunk is delivered or the context is done + select { + case <-rctx.Done(): + return nil, rctx.Err() + case <-f.deliveredC: + return f.chunk, nil + case <-f.cancelledC: + return nil, fmt.Errorf("fetcher cancelled") + } +} + +// deliver is called by NetStore.Put to notify all pending requests +func (f *fetcher) deliver(ctx context.Context, ch Chunk) { + f.deliverOnce.Do(func() { + f.chunk = ch + // closing the deliveredC channel will terminate ongoing requests + close(f.deliveredC) + }) } diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 7babbf5e0..f08968f0e 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -17,107 +17,622 @@ package storage import ( + "bytes" "context" - "encoding/hex" - "errors" + "crypto/rand" "io/ioutil" + "sync" "testing" "time" - "github.com/ethereum/go-ethereum/swarm/network" -) + "github.com/ethereum/go-ethereum/p2p/discover" + ch "github.com/ethereum/go-ethereum/swarm/chunk" -var ( - errUnknown = errors.New("unknown error") + "github.com/ethereum/go-ethereum/common" ) -type mockRetrieve struct { - requests map[string]int +var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + +type mockNetFetcher struct { + peers *sync.Map + sources []*discover.NodeID + peersPerRequest [][]Address + requestCalled bool + offerCalled bool + quit <-chan struct{} + ctx context.Context +} + +func (m *mockNetFetcher) Offer(ctx context.Context, source *discover.NodeID) { + m.offerCalled = true + m.sources = append(m.sources, source) +} + +func (m *mockNetFetcher) Request(ctx context.Context) { + m.requestCalled = true + var peers []Address + m.peers.Range(func(key interface{}, _ interface{}) bool { + peers = append(peers, common.FromHex(key.(string))) + return true + }) + m.peersPerRequest = append(m.peersPerRequest, peers) +} + +type mockNetFetchFuncFactory struct { + fetcher *mockNetFetcher +} + +func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher { + m.fetcher.peers = peers + m.fetcher.quit = ctx.Done() + m.fetcher.ctx = ctx + return m.fetcher +} + +func mustNewNetStore(t *testing.T) *NetStore { + netStore, _ := mustNewNetStoreWithFetcher(t) + return netStore } -func NewMockRetrieve() *mockRetrieve { - return &mockRetrieve{requests: make(map[string]int)} +func mustNewNetStoreWithFetcher(t *testing.T) (*NetStore, *mockNetFetcher) { + t.Helper() + + datadir, err := ioutil.TempDir("", "netstore") + if err != nil { + t.Fatal(err) + } + naddr := make([]byte, 32) + params := NewDefaultLocalStoreParams() + params.Init(datadir) + params.BaseKey = naddr + localStore, err := NewTestLocalStoreForAddr(params) + if err != nil { + t.Fatal(err) + } + + fetcher := &mockNetFetcher{} + mockNetFetchFuncFactory := &mockNetFetchFuncFactory{ + fetcher: fetcher, + } + netStore, err := NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher) + if err != nil { + t.Fatal(err) + } + return netStore, fetcher } -func newDummyChunk(addr Address) *Chunk { - chunk := NewChunk(addr, make(chan bool)) - chunk.SData = []byte{3, 4, 5} - chunk.Size = 3 +// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put. +// After the Put there should no active fetchers, and the context created for the fetcher should +// be cancelled. +func TestNetStoreGetAndPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + }() + + close(c) + recChunk, err := netStore.Get(ctx, chunk.Address()) // this is blocked until the Put above is done + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // the chunk is already available locally, so there should be no active fetchers waiting for it + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // A fetcher was created when the Get was called (and the chunk was not available). The chunk + // was delivered with the Put call, so the fetcher should be cancelled now. + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } - return chunk } -func (m *mockRetrieve) retrieve(ctx context.Context, chunk *Chunk) error { - hkey := hex.EncodeToString(chunk.Addr) - m.requests[hkey] += 1 +// TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get. +// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore, +// there is no need to create fetchers. +func TestNetStoreGetAfterPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) - // on second call return error - if m.requests[hkey] == 2 { - return errUnknown + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // First we Put the chunk, so the chunk will be available locally + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) } - // on third call return data - if m.requests[hkey] == 3 { - *chunk = *newDummyChunk(chunk.Addr) + // Get should retrieve the chunk from LocalStore, without creating fetcher + recChunk, err := netStore.Get(ctx, chunk.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // no fetcher offer or request should be created for a locally available chunk + if fetcher.offerCalled || fetcher.requestCalled { + t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called") + } + // no fetchers should be created for a locally available chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } + +} + +// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout +func TestNetStoreGetTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + }() + + close(c) + // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will + // be a timeout + _, err := netStore.Get(ctx, chunk.Address()) + + // Check if the timeout happened + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // A fetcher was created, check if it has been removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // Check if the fetcher context has been cancelled after the timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks +// the errors +func TestNetStoreGetCancel(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to use a fetcher for the Get call") + } + cancel() + }() + + close(c) + // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery + _, err := netStore.Get(ctx, chunk.Address()) + + // After the context is cancelled above Get should return with an error + if err != context.Canceled { + t.Fatalf("Expected context.Canceled err got %v", err) + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after cancel") + } + + // Check if the fetcher context has been cancelled after the request context cancel + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is +// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher +// for the chunk retrieval +func TestNetStoreMultipleGetAndPut(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go func() { + // sleep to make sure Put is called after all the Get + time.Sleep(500 * time.Millisecond) + // check if netStore created exactly one fetcher for all Get calls + if netStore.fetchers.Len() != 1 { + t.Fatal("Expected netStore to use one fetcher for all Get calls") + } + err := netStore.Put(ctx, chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + }() + + // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above. + getWG := sync.WaitGroup{} + for i := 0; i < 4; i++ { + getWG.Add(1) go func() { - time.Sleep(100 * time.Millisecond) - close(chunk.ReqC) + defer getWG.Done() + recChunk, err := netStore.Get(ctx, chunk.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { + t.Fatalf("Different chunk received than what was put") + } }() + } + + finishedC := make(chan struct{}) + go func() { + getWG.Wait() + close(finishedC) + }() + + // The Get calls should return after Put, so no timeout expected + select { + case <-finishedC: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout waiting for Get calls to return") + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } - return nil + // A fetcher was created, check if it has been removed after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") } - return nil } -func TestNetstoreFailedRequest(t *testing.T) { - searchTimeout = 300 * time.Millisecond +// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout +func TestNetStoreFetchFuncTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) - // setup - addr := network.RandomAddr() // tested peers peer address + chunk := GenerateRandomChunk(ch.DefaultSize) - // temp datadir - datadir, err := ioutil.TempDir("", "netstore") + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should an active fetcher for the chunk after the FetchFunc call + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // wait function should timeout because we don't deliver the chunk with a Put + err := wait(ctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // the fetcher should be removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // the fetcher context should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk +func TestNetStoreFetchFuncAfterPut(t *testing.T) { + netStore := mustNewNetStore(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // We deliver the created the chunk with a Put + err := netStore.Put(ctx, chunk) if err != nil { - t.Fatal(err) + t.Fatalf("Expected no err got %v", err) } - params := NewDefaultLocalStoreParams() - params.Init(datadir) - params.BaseKey = addr.Over() - localStore, err := NewTestLocalStoreForAddr(params) + + // FetchFunc should return nil, because the chunk is available locally, no need to fetch it + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait != nil { + t.Fatal("Expected wait to be nil") + } + + // No fetchers should be created at all + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } +} + +// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk +func TestNetStoreGetCallsRequest(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + _, err := netStore.Get(ctx, chunk.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadlineExceeded err got %v", err) + } + + // NetStore should call NetFetcher.Request and wait for the chunk + if !fetcher.requestCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } +} + +// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk +// in case of a source peer provided in the context. +func TestNetStoreGetCallsOffer(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + // If a source peer is added to the context, NetStore will handle it as an offer + ctx := context.WithValue(context.Background(), "source", sourcePeerID.String()) + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + chunk, err := netStore.Get(ctx, chunk.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err) + } + + // NetStore should call NetFetcher.Offer with the source peer + if !fetcher.offerCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } + + if len(fetcher.sources) != 1 { + t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources)) + } + + if fetcher.sources[0].String() != sourcePeerID.String() { + t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0]) + } + +} + +// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context. +// There is no Put call, so the Get calls timeout +func TestNetStoreFetcherCountPeers(t *testing.T) { + + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + addr := randomAddr() + peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + errC := make(chan error) + nrGets := 3 + + // Call Get 3 times with a peer in context + for i := 0; i < nrGets; i++ { + peer := peers[i] + go func() { + ctx := context.WithValue(ctx, "peer", peer) + _, err := netStore.Get(ctx, addr) + errC <- err + }() + } + + // All 3 Get calls should timeout + for i := 0; i < nrGets; i++ { + err := <-errC + if err != context.DeadlineExceeded { + t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err) + } + } + + // fetcher should be closed after timeout + select { + case <-fetcher.quit: + case <-time.After(3 * time.Second): + t.Fatalf("mockNetFetcher not closed after timeout") + } + + // All 3 peers should be given to NetFetcher after the 3 Get calls + if len(fetcher.peersPerRequest) != nrGets { + t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest)) + } + + for i, peers := range fetcher.peersPerRequest { + if len(peers) < i+1 { + t.Fatalf("Expected at least %v got %v", i+1, len(peers)) + } + } +} + +// TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times, +// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks +// if the fetcher is closed. +func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) + + chunk := GenerateRandomChunk(ch.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // Call wait three times parallelly + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + err := wait(ctx) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + wg.Done() + }() + } + + // sleep a little so the wait functions are called above + time.Sleep(100 * time.Millisecond) + + // there should be still only one fetcher, because all wait calls are for the same chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatal("Expected netStore to have one fetcher for the requested chunk") + } + + // Deliver the chunk with a Put + err := netStore.Put(ctx, chunk) if err != nil { - t.Fatal(err) + t.Fatalf("Expected no err got %v", err) + } + + // wait until all wait calls return (because the chunk is delivered) + wg.Wait() + + // There should be no more fetchers for the delivered chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") } - r := NewMockRetrieve() - netStore := NewNetStore(localStore, r.retrieve) + // The context for the fetcher should be cancelled after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes, +// the only difference is that we don't deilver the chunk, just wait for timeout +func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { + netStore, fetcher := mustNewNetStoreWithFetcher(t) - key := Address{} + chunk := GenerateRandomChunk(ch.DefaultSize) - // first call is done by the retry on ErrChunkNotFound, no need to do it here - // _, err = netStore.Get(key) - // if err == nil || err != ErrChunkNotFound { - // t.Fatalf("expected to get ErrChunkNotFound, but got: %s", err) - // } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() - // second call - _, err = netStore.Get(context.TODO(), key) - if got := r.requests[hex.EncodeToString(key)]; got != 2 { - t.Fatalf("expected to have called retrieve two times, but got: %v", got) + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") } - if err != errUnknown { - t.Fatalf("expected to get an unknown error, but got: %s", err) + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") } - // third call - chunk, err := netStore.Get(context.TODO(), key) - if got := r.requests[hex.EncodeToString(key)]; got != 3 { - t.Fatalf("expected to have called retrieve three times, but got: %v", got) + // Call wait three times parallelly + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer rcancel() + err := wait(rctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected err %v got %v", context.DeadlineExceeded, err) + } + }() } - if err != nil || chunk == nil { - t.Fatalf("expected to get a chunk but got: %v, %s", chunk, err) + + // wait until all wait calls timeout + wg.Wait() + + // There should be no more fetchers after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") } - if len(chunk.SData) != 3 { - t.Fatalf("expected to get a chunk with size 3, but got: %v", chunk.SData) + + // The context for the fetcher should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") } } + +func randomAddr() Address { + addr := make([]byte, 32) + rand.Read(addr) + return Address(addr) +} diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 36ff66d04..f74eef06b 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" ) @@ -57,7 +57,7 @@ import ( When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one - tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file. + tree entry is present in certain level. The key of tree entry is given out as the rootAddress of the file. */ @@ -98,15 +98,15 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get } /* - When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes. + When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes. New chunks to store are store using the putter which the caller provides. */ func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, ch.DefaultSize)).Split(ctx) } func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, ch.DefaultSize)).Append(ctx) } // Entry to create a tree node @@ -153,7 +153,7 @@ type PyramidChunker struct { wg *sync.WaitGroup errC chan error quitC chan bool - rootKey []byte + rootAddress []byte chunkLevel [][]*TreeEntry } @@ -171,14 +171,14 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { pc.wg = &sync.WaitGroup{} pc.errC = make(chan error) pc.quitC = make(chan bool) - pc.rootKey = make([]byte, pc.hashSize) + pc.rootAddress = make([]byte, pc.hashSize) pc.chunkLevel = make([][]*TreeEntry, pc.branches) return } func (pc *PyramidChunker) Join(addr Address, getter Getter, depth int) LazySectionReader { return &LazyChunkReader{ - key: addr, + addr: addr, depth: depth, chunkSize: pc.chunkSize, branches: pc.branches, @@ -209,7 +209,7 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte log.Debug("pyramid.chunker: Split()") pc.wg.Add(1) - pc.prepareChunks(false) + pc.prepareChunks(ctx, false) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -231,19 +231,21 @@ func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(conte if err != nil { return nil, nil, err } - case <-time.NewTimer(splitTimeout).C: + case <-ctx.Done(): + _ = pc.putter.Wait(ctx) //??? + return nil, nil, ctx.Err() } - return pc.rootKey, pc.putter.Wait, nil + return pc.rootAddress, pc.putter.Wait, nil } func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) { log.Debug("pyramid.chunker: Append()") // Load the right most unfinished tree chunks in every level - pc.loadTree() + pc.loadTree(ctx) pc.wg.Add(1) - pc.prepareChunks(true) + pc.prepareChunks(ctx, true) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -265,11 +267,11 @@ func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(cont case <-time.NewTimer(splitTimeout).C: } - return pc.rootKey, pc.putter.Wait, nil + return pc.rootAddress, pc.putter.Wait, nil } -func (pc *PyramidChunker) processor(id int64) { +func (pc *PyramidChunker) processor(ctx context.Context, id int64) { defer pc.decrementWorkerCount() for { select { @@ -278,19 +280,22 @@ func (pc *PyramidChunker) processor(id int64) { if !ok { return } - pc.processChunk(id, job) + pc.processChunk(ctx, id, job) case <-pc.quitC: return } } } -func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { +func (pc *PyramidChunker) processChunk(ctx context.Context, id int64, job *chunkJob) { log.Debug("pyramid.chunker: processChunk()", "id", id) - ref, err := pc.putter.Put(context.TODO(), job.chunk) + ref, err := pc.putter.Put(ctx, job.chunk) if err != nil { - pc.errC <- err + select { + case pc.errC <- err: + case <-pc.quitC: + } } // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk) @@ -300,14 +305,14 @@ func (pc *PyramidChunker) processChunk(id int64, job *chunkJob) { job.parentWg.Done() } -func (pc *PyramidChunker) loadTree() error { +func (pc *PyramidChunker) loadTree(ctx context.Context) error { log.Debug("pyramid.chunker: loadTree()") // Get the root chunk to get the total size - chunkData, err := pc.getter.Get(context.TODO(), Reference(pc.key)) + chunkData, err := pc.getter.Get(ctx, Reference(pc.key)) if err != nil { return errLoadingTreeRootChunk } - chunkSize := chunkData.Size() + chunkSize := int64(chunkData.Size()) log.Trace("pyramid.chunker: root chunk", "chunk.Size", chunkSize, "pc.chunkSize", pc.chunkSize) //if data size is less than a chunk... add a parent with update as pending @@ -356,7 +361,7 @@ func (pc *PyramidChunker) loadTree() error { branchCount = int64(len(ent.chunk)-8) / pc.hashSize for i := int64(0); i < branchCount; i++ { key := ent.chunk[8+(i*pc.hashSize) : 8+((i+1)*pc.hashSize)] - newChunkData, err := pc.getter.Get(context.TODO(), Reference(key)) + newChunkData, err := pc.getter.Get(ctx, Reference(key)) if err != nil { return errLoadingTreeChunk } @@ -365,7 +370,7 @@ func (pc *PyramidChunker) loadTree() error { newEntry := &TreeEntry{ level: lvl - 1, branchCount: bewBranchCount, - subtreeSize: uint64(newChunkSize), + subtreeSize: newChunkSize, chunk: newChunkData, key: key, index: 0, @@ -385,7 +390,7 @@ func (pc *PyramidChunker) loadTree() error { return nil } -func (pc *PyramidChunker) prepareChunks(isAppend bool) { +func (pc *PyramidChunker) prepareChunks(ctx context.Context, isAppend bool) { log.Debug("pyramid.chunker: prepareChunks", "isAppend", isAppend) defer pc.wg.Done() @@ -393,11 +398,11 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { pc.incrementWorkerCount() - go pc.processor(pc.workerCount) + go pc.processor(ctx, pc.workerCount) parent := NewTreeEntry(pc) var unfinishedChunkData ChunkData - var unfinishedChunkSize int64 + var unfinishedChunkSize uint64 if isAppend && len(pc.chunkLevel[0]) != 0 { lastIndex := len(pc.chunkLevel[0]) - 1 @@ -415,16 +420,16 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { } lastBranch := parent.branchCount - 1 - lastKey := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] + lastAddress := parent.chunk[8+lastBranch*pc.hashSize : 8+(lastBranch+1)*pc.hashSize] var err error - unfinishedChunkData, err = pc.getter.Get(context.TODO(), lastKey) + unfinishedChunkData, err = pc.getter.Get(ctx, lastAddress) if err != nil { pc.errC <- err } unfinishedChunkSize = unfinishedChunkData.Size() - if unfinishedChunkSize < pc.chunkSize { - parent.subtreeSize = parent.subtreeSize - uint64(unfinishedChunkSize) + if unfinishedChunkSize < uint64(pc.chunkSize) { + parent.subtreeSize = parent.subtreeSize - unfinishedChunkSize parent.branchCount = parent.branchCount - 1 } else { unfinishedChunkData = nil @@ -468,8 +473,8 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { if parent.branchCount == 1 && (pc.depth() == 0 || isAppend) { // Data is exactly one chunk.. pick the last chunk key as root chunkWG.Wait() - lastChunksKey := parent.chunk[8 : 8+pc.hashSize] - copy(pc.rootKey, lastChunksKey) + lastChunksAddress := parent.chunk[8 : 8+pc.hashSize] + copy(pc.rootAddress, lastChunksAddress) break } } else { @@ -502,7 +507,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { // No need to build the tree if the depth is 0 // or we are appending. // Just use the last key. - copy(pc.rootKey, pkey) + copy(pc.rootAddress, pkey) } else { // We need to build the tree and and provide the lonely // chunk key to replace the last tree chunk key. @@ -525,7 +530,7 @@ func (pc *PyramidChunker) prepareChunks(isAppend bool) { workers := pc.getWorkerCount() if int64(len(pc.jobC)) > workers && workers < ChunkProcessors { pc.incrementWorkerCount() - go pc.processor(pc.workerCount) + go pc.processor(ctx, pc.workerCount) } } @@ -558,7 +563,7 @@ func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync lvlCount := int64(len(pc.chunkLevel[lvl])) if lvlCount == 1 && last { - copy(pc.rootKey, pc.chunkLevel[lvl][0].key) + copy(pc.rootAddress, pc.chunkLevel[lvl][0].key) return } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 53e3af485..bc2af2cd7 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -25,16 +25,16 @@ import ( "fmt" "hash" "io" - "sync" + "io/ioutil" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/swarm/bmt" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" ) const MaxPO = 16 -const KeyLength = 32 +const AddressLength = 32 type Hasher func() hash.Hash type SwarmHasher func() SwarmHash @@ -116,7 +116,7 @@ func MakeHashFunc(hash string) SwarmHasher { return func() SwarmHash { hasher := sha3.NewKeccak256 hasherSize := hasher().Size() - segmentCount := chunk.DefaultSize / hasherSize + segmentCount := ch.DefaultSize / hasherSize pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) return bmt.New(pool) } @@ -169,88 +169,88 @@ func (c AddressCollection) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -// Chunk also serves as a request object passed to ChunkStores -// in case it is a retrieval request, Data is nil and Size is 0 -// Note that Size is not the size of the data chunk, which is Data.Size() -// but the size of the subtree encoded in the chunk -// 0 if request, to be supplied by the dpa -type Chunk struct { - Addr Address // always - SData []byte // nil if request, to be supplied by dpa - Size int64 // size of the data covered by the subtree encoded in this chunk - //Source Peer // peer - C chan bool // to signal data delivery by the dpa - ReqC chan bool // to signal the request done - dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore - dbStored bool - dbStoredMu *sync.Mutex - errored error // flag which is set when the chunk request has errored or timeouted - erroredMu sync.Mutex +// Chunk interface implemented by context.Contexts and data chunks +type Chunk interface { + Address() Address + Payload() []byte + SpanBytes() []byte + Span() int64 + Data() []byte } -func (c *Chunk) SetErrored(err error) { - c.erroredMu.Lock() - defer c.erroredMu.Unlock() +type chunk struct { + addr Address + sdata []byte + span int64 +} - c.errored = err +func NewChunk(addr Address, data []byte) *chunk { + return &chunk{ + addr: addr, + sdata: data, + span: -1, + } } -func (c *Chunk) GetErrored() error { - c.erroredMu.Lock() - defer c.erroredMu.Unlock() +func (c *chunk) Address() Address { + return c.addr +} - return c.errored +func (c *chunk) SpanBytes() []byte { + return c.sdata[:8] } -func NewChunk(addr Address, reqC chan bool) *Chunk { - return &Chunk{ - Addr: addr, - ReqC: reqC, - dbStoredC: make(chan bool), - dbStoredMu: &sync.Mutex{}, +func (c *chunk) Span() int64 { + if c.span == -1 { + c.span = int64(binary.LittleEndian.Uint64(c.sdata[:8])) } + return c.span } -func (c *Chunk) markAsStored() { - c.dbStoredMu.Lock() - defer c.dbStoredMu.Unlock() - - if !c.dbStored { - close(c.dbStoredC) - c.dbStored = true - } +func (c *chunk) Data() []byte { + return c.sdata } -func (c *Chunk) WaitToStore() error { - <-c.dbStoredC - return c.GetErrored() +func (c *chunk) Payload() []byte { + return c.sdata[8:] } -func GenerateRandomChunk(dataSize int64) *Chunk { - return GenerateRandomChunks(dataSize, 1)[0] +// String() for pretty printing +func (self *chunk) String() string { + return fmt.Sprintf("Address: %v TreeSize: %v Chunksize: %v", self.addr.Log(), self.span, len(self.sdata)) } -func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) { - var i int +func GenerateRandomChunk(dataSize int64) Chunk { hasher := MakeHashFunc(DefaultHash)() - if dataSize > chunk.DefaultSize { - dataSize = chunk.DefaultSize - } + sdata := make([]byte, dataSize+8) + rand.Read(sdata[8:]) + binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize)) + hasher.ResetWithLength(sdata[:8]) + hasher.Write(sdata[8:]) + return NewChunk(hasher.Sum(nil), sdata) +} - for i = 0; i < count; i++ { - chunks = append(chunks, NewChunk(nil, nil)) - chunks[i].SData = make([]byte, dataSize+8) - rand.Read(chunks[i].SData) - binary.LittleEndian.PutUint64(chunks[i].SData[:8], uint64(dataSize)) - hasher.ResetWithLength(chunks[i].SData[:8]) - hasher.Write(chunks[i].SData[8:]) - chunks[i].Addr = make([]byte, 32) - copy(chunks[i].Addr, hasher.Sum(nil)) +func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) { + if dataSize > ch.DefaultSize { + dataSize = ch.DefaultSize + } + for i := 0; i < count; i++ { + ch := GenerateRandomChunk(ch.DefaultSize) + chunks = append(chunks, ch) } - return chunks } +func GenerateRandomData(l int) (r io.Reader, slice []byte) { + slice, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(l))) + if err != nil { + panic("rand error") + } + // log.Warn("generate random data", "len", len(slice), "data", common.Bytes2Hex(slice)) + r = io.LimitReader(bytes.NewReader(slice), int64(l)) + return r, slice +} + // Size, Seek, Read, ReadAt type LazySectionReader interface { Context() context.Context @@ -273,18 +273,17 @@ func (r *LazyTestSectionReader) Context() context.Context { } type StoreParams struct { - Hash SwarmHasher `toml:"-"` - DbCapacity uint64 - CacheCapacity uint - ChunkRequestsCacheCapacity uint - BaseKey []byte + Hash SwarmHasher `toml:"-"` + DbCapacity uint64 + CacheCapacity uint + BaseKey []byte } func NewDefaultStoreParams() *StoreParams { - return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, defaultChunkRequestsCacheCapacity, nil, nil) + return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, nil, nil) } -func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHasher, basekey []byte) *StoreParams { +func NewStoreParams(ldbCap uint64, cacheCap uint, hash SwarmHasher, basekey []byte) *StoreParams { if basekey == nil { basekey = make([]byte, 32) } @@ -292,11 +291,10 @@ func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHa hash = MakeHashFunc(DefaultHash) } return &StoreParams{ - Hash: hash, - DbCapacity: ldbCap, - CacheCapacity: cacheCap, - ChunkRequestsCacheCapacity: requestsCap, - BaseKey: basekey, + Hash: hash, + DbCapacity: ldbCap, + CacheCapacity: cacheCap, + BaseKey: basekey, } } @@ -321,8 +319,8 @@ type Getter interface { } // NOTE: this returns invalid data if chunk is encrypted -func (c ChunkData) Size() int64 { - return int64(binary.LittleEndian.Uint64(c[:8])) +func (c ChunkData) Size() uint64 { + return binary.LittleEndian.Uint64(c[:8]) } func (c ChunkData) Data() []byte { @@ -348,7 +346,8 @@ func NewContentAddressValidator(hasher SwarmHasher) *ContentAddressValidator { // Validate that the given key is a valid content address for the given data func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool { - if l := len(data); l < 9 || l > chunk.DefaultSize+8 { + if l := len(data); l < 9 || l > ch.DefaultSize+8 { + // log.Error("invalid chunk size", "chunk", addr.Hex(), "size", l) return false } @@ -359,3 +358,37 @@ func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool { return bytes.Equal(hash, addr[:]) } + +type ChunkStore interface { + Put(ctx context.Context, ch Chunk) (err error) + Get(rctx context.Context, ref Address) (ch Chunk, err error) + Close() +} + +// SyncChunkStore is a ChunkStore which supports syncing +type SyncChunkStore interface { + ChunkStore + BinIndex(po uint8) uint64 + Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error + FetchFunc(ctx context.Context, ref Address) func(context.Context) error +} + +// FakeChunkStore doesn't store anything, just implements the ChunkStore interface +// It can be used to inject into a hasherStore if you don't want to actually store data just do the +// hashing +type FakeChunkStore struct { +} + +// Put doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Put(_ context.Context, ch Chunk) error { + return nil +} + +// Gut doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Get(_ context.Context, ref Address) (Chunk, error) { + panic("FakeChunkStore doesn't support Get") +} + +// Close doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Close() { +} diff --git a/swarm/swarm.go b/swarm/swarm.go index baf71b962..13aa1125d 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -75,8 +75,8 @@ type Swarm struct { privateKey *ecdsa.PrivateKey corsString string swapEnabled bool - lstore *storage.LocalStore // local store, needs to store for releasing resources after node stopped - sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit + netStore *storage.NetStore + sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit ps *pss.Pss tracerClose io.Closer @@ -164,37 +164,40 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.dns = resolver } - self.lstore, err = storage.NewLocalStore(config.LocalStoreParams, mockStore) + lstore, err := storage.NewLocalStore(config.LocalStoreParams, mockStore) if err != nil { - return + return nil, err + } + + self.netStore, err = storage.NewNetStore(lstore, nil) + if err != nil { + return nil, err } - db := storage.NewDBAPI(self.lstore) to := network.NewKademlia( common.FromHex(config.BzzKey), network.NewKadParams(), ) - delivery := stream.NewDelivery(to, db) + delivery := stream.NewDelivery(to, self.netStore) + self.netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, config.DeliverySkipCheck).New - self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{ - SkipCheck: config.DeliverySkipCheck, + self.streamer = stream.NewRegistry(addr, delivery, self.netStore, stateStore, &stream.RegistryOptions{ + SkipCheck: config.SyncingSkipCheck, DoSync: config.SyncEnabled, DoRetrieve: true, SyncUpdateDelay: config.SyncUpdateDelay, }) - // set up NetStore, the cloud storage local access layer - netStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - self.fileStore = storage.NewFileStore(netStore, self.config.FileStoreParams) + self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams) var resourceHandler *mru.Handler rhparams := &mru.HandlerParams{} resourceHandler = mru.NewHandler(rhparams) - resourceHandler.SetStore(netStore) + resourceHandler.SetStore(self.netStore) - self.lstore.Validators = []storage.ChunkValidator{ + lstore.Validators = []storage.ChunkValidator{ storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash)), resourceHandler, } @@ -399,7 +402,7 @@ func (self *Swarm) periodicallyUpdateGauges() { func (self *Swarm) updateGauges() { uptimeGauge.Update(time.Since(startTime).Nanoseconds()) - requestsCacheGauge.Update(int64(self.lstore.RequestsCacheLen())) + requestsCacheGauge.Update(int64(self.netStore.RequestsCacheLen())) } // implements the node.Service interface @@ -420,8 +423,8 @@ func (self *Swarm) Stop() error { ch.Save() } - if self.lstore != nil { - self.lstore.DbStore.Close() + if self.netStore != nil { + self.netStore.Close() } self.sfs.Stop() stopCounter.Inc(1) @@ -478,21 +481,6 @@ func (self *Swarm) APIs() []rpc.API { Service: self.sfs, Public: false, }, - // storage APIs - // DEPRECATED: Use the HTTP API instead - { - Namespace: "bzz", - Version: "0.1", - Service: api.NewStorage(self.api), - Public: true, - }, - { - Namespace: "bzz", - Version: "0.1", - Service: api.NewFileSystem(self.api), - Public: false, - }, - // {Namespace, Version, api.NewAdmin(self), false}, } apis = append(apis, self.bzz.APIs()...) diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index 0827748ae..c6569e37b 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -82,8 +82,8 @@ func TestNewSwarm(t *testing.T) { if s.dns != nil { t.Error("dns initialized, but it should not be") } - if s.lstore == nil { - t.Error("localstore not initialized") + if s.netStore == nil { + t.Error("netStore not initialized") } if s.streamer == nil { t.Error("streamer not initialized") @@ -91,9 +91,6 @@ func TestNewSwarm(t *testing.T) { if s.fileStore == nil { t.Error("fileStore not initialized") } - if s.lstore.Validators == nil { - t.Error("localstore validators not initialized") - } if s.bzz == nil { t.Error("bzz not initialized") } |