diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm/network | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/fetcher.go | 305 | ||||
-rw-r--r-- | swarm/network/fetcher_test.go | 459 | ||||
-rw-r--r-- | swarm/network/priorityqueue/priorityqueue.go | 38 | ||||
-rw-r--r-- | swarm/network/priorityqueue/priorityqueue_test.go | 6 | ||||
-rw-r--r-- | swarm/network/simulation/simulation.go | 16 | ||||
-rw-r--r-- | swarm/network/stream/common_test.go | 17 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 231 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 93 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 78 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 87 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 51 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 46 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 170 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 28 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/syncer.go | 94 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 32 |
17 files changed, 1293 insertions, 466 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go new file mode 100644 index 000000000..35e2f0132 --- /dev/null +++ b/swarm/network/fetcher.go @@ -0,0 +1,305 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +var searchTimeout = 1 * time.Second + +// Time to consider peer to be skipped. +// Also used in stream delivery. +var RequestTimeout = 10 * time.Second + +type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error) + +// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and +// keeps it alive until all active requests are completed. This can happen: +// 1. either because the chunk is delivered +// 2. or becuse the requestor cancelled/timed out +// Fetcher self destroys itself after it is completed. +// TODO: cancel all forward requests after termination +type Fetcher struct { + protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk + addr storage.Address // the address of the chunk to be fetched + offerC chan *discover.NodeID // channel of sources (peer node id strings) + requestC chan struct{} + skipCheck bool +} + +type Request struct { + Addr storage.Address // chunk address + Source *discover.NodeID // nodeID of peer to request from (can be nil) + SkipCheck bool // whether to offer the chunk first or deliver directly + peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil) +} + +// NewRequest returns a new instance of Request based on chunk address skip check and +// a map of peers to skip. +func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request { + return &Request{ + Addr: addr, + SkipCheck: skipCheck, + peersToSkip: peersToSkip, + } +} + +// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk. +// Peers to skip are kept per Request and for a time period of RequestTimeout. +// This function is used in stream package in Delivery.RequestFromPeers to optimize +// requests for chunks. +func (r *Request) SkipPeer(nodeID string) bool { + val, ok := r.peersToSkip.Load(nodeID) + if !ok { + return false + } + t, ok := val.(time.Time) + if ok && time.Now().After(t.Add(RequestTimeout)) { + // deadine expired + r.peersToSkip.Delete(nodeID) + return false + } + return true +} + +// FetcherFactory is initialised with a request function and can create fetchers +type FetcherFactory struct { + request RequestFunc + skipCheck bool +} + +// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory +func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { + return &FetcherFactory{ + request: request, + skipCheck: skipCheck, + } +} + +// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to +// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting +// this chunk, to make sure we don't request back the chunks from them. +// The created Fetcher is started and returned. +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(source, f.request, f.skipCheck) + go fetcher.run(ctx, peersToSkip) + return fetcher +} + +// NewFetcher creates a new Fetcher for the given chunk address using the given request function. +func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { + return &Fetcher{ + addr: addr, + protoRequestFunc: rf, + offerC: make(chan *discover.NodeID), + requestC: make(chan struct{}), + skipCheck: skipCheck, + } +} + +// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. +func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.offerC <- source: + case <-ctx.Done(): + } +} + +// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. +func (f *Fetcher) Request(ctx context.Context) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.requestC <- struct{}{}: + case <-ctx.Done(): + } +} + +// start prepares the Fetcher +// it keeps the Fetcher alive within the lifecycle of the passed context +func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { + var ( + doRequest bool // determines if retrieval is initiated in the current iteration + wait *time.Timer // timer for search timeout + waitC <-chan time.Time // timer channel + sources []*discover.NodeID // known sources, ie. peers that offered the chunk + requested bool // true if the chunk was actually requested + ) + gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected + + // loop that keeps the fetching process alive + // after every request a timer is set. If this goes off we request again from another peer + // note that the previous request is still alive and has the chance to deliver, so + // rerequesting extends the search. ie., + // if a peer we requested from is gone we issue a new request, so the number of active + // requests never decreases + for { + select { + + // incoming offer + case source := <-f.offerC: + log.Trace("new source", "peer addr", source, "request addr", f.addr) + // 1) the chunk is offered by a syncing peer + // add to known sources + sources = append(sources, source) + // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer) + doRequest = requested + + // incoming request + case <-f.requestC: + log.Trace("new request", "request addr", f.addr) + // 2) chunk is requested, set requested flag + // launch a request iff none been launched yet + doRequest = !requested + requested = true + + // peer we requested from is gone. fall back to another + // and remove the peer from the peers map + case id := <-gone: + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr) + peers.Delete(id.String()) + doRequest = requested + + // search timeout: too much time passed since the last request, + // extend the search to a new peer if we can find one + case <-waitC: + log.Trace("search timed out: rerequesting", "request addr", f.addr) + doRequest = requested + + // all Fetcher context closed, can quit + case <-ctx.Done(): + log.Trace("terminate fetcher", "request addr", f.addr) + // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from) + return + } + + // need to issue a new request + if doRequest { + var err error + sources, err = f.doRequest(ctx, gone, peers, sources) + if err != nil { + log.Warn("unable to request", "request addr", f.addr, "err", err) + } + } + + // if wait channel is not set, set it to a timer + if requested { + if wait == nil { + wait = time.NewTimer(searchTimeout) + defer wait.Stop() + waitC = wait.C + } else { + // stop the timer and drain the channel if it was not drained earlier + if !wait.Stop() { + select { + case <-wait.C: + default: + } + } + // reset the timer to go off after searchTimeout + wait.Reset(searchTimeout) + } + } + doRequest = false + } +} + +// doRequest attempts at finding a peer to request the chunk from +// * first it tries to request explicitly from peers that are known to have offered the chunk +// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address +// excluding those in the peersToSkip map +// * if no such peer is found an error is returned +// +// if a request is successful, +// * the peer's address is added to the set of peers to skip +// * the peer's address is removed from prospective sources, and +// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) +func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) { + var i int + var sourceID *discover.NodeID + var quit chan struct{} + + req := &Request{ + Addr: f.addr, + SkipCheck: f.skipCheck, + peersToSkip: peersToSkip, + } + + foundSource := false + // iterate over known sources + for i = 0; i < len(sources); i++ { + req.Source = sources[i] + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err == nil { + // remove the peer from known sources + // Note: we can modify the source although we are looping on it, because we break from the loop immediately + sources = append(sources[:i], sources[i+1:]...) + foundSource = true + break + } + } + + // if there are no known sources, or none available, we try request from a closest node + if !foundSource { + req.Source = nil + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err != nil { + // if no peers found to request from + return sources, err + } + } + // add peer to the set of peers to skip from now + peersToSkip.Store(sourceID.String(), time.Now()) + + // if the quit channel is closed, it indicates that the source peer we requested from + // disconnected or terminated its streamer + // here start a go routine that watches this channel and reports the source peer on the gone channel + // this go routine quits if the fetcher global context is done to prevent process leak + go func() { + select { + case <-quit: + gone <- sourceID + case <-ctx.Done(): + } + }() + return sources, nil +} diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go new file mode 100644 index 000000000..21b81d652 --- /dev/null +++ b/swarm/network/fetcher_test.go @@ -0,0 +1,459 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +var requestedPeerID = discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") +var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + +// mockRequester pushes every request to the requestC channel when its doRequest function is called +type mockRequester struct { + // requests []Request + requestC chan *Request // when a request is coming it is pushed to requestC + waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional) + ctr int //counts the number of requests + quitC chan struct{} +} + +func newMockRequester(waitTimes ...time.Duration) *mockRequester { + return &mockRequester{ + requestC: make(chan *Request), + waitTimes: waitTimes, + quitC: make(chan struct{}), + } +} + +func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*discover.NodeID, chan struct{}, error) { + waitTime := time.Duration(0) + if m.ctr < len(m.waitTimes) { + waitTime = m.waitTimes[m.ctr] + m.ctr++ + } + time.Sleep(waitTime) + m.requestC <- request + + // if there is a Source in the request use that, if not use the global requestedPeerId + source := request.Source + if source == nil { + source = &requestedPeerID + } + return source, m.quitC, nil +} + +// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip. +// mockRequester pushes a Request on a channel every time the request function is called. Using +// this channel we test if calling Fetcher.Request calls the request function, and whether it uses +// the correct peers to skip which we provided for the fetcher.run function. +func TestFetcherSingleRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peers := []string{"a", "b", "c", "d"} + peersToSkip := &sync.Map{} + for _, p := range peers { + peersToSkip.Store(p, time.Now()) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + fetcher.Request(rctx) + + select { + case request := <-requester.requestC: + // request should contain all peers from peersToSkip provided to the fetcher + for _, p := range peers { + if _, ok := request.peersToSkip.Load(p); !ok { + t.Fatalf("request.peersToSkip misses peer") + } + } + + // source peer should be also added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok { + t.Fatalf("request.peersToSkip does not contain peer returned by the request function") + } + + // fetch should trigger a request, if it doesn't happen in time, test should fail + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } +} + +// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called +func TestFetcherCancelStopsFetcher(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + + // we start the fetcher, and then we immediately cancel the context + go fetcher.run(ctx, peersToSkip) + cancel() + + rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer rcancel() + // we call Request with an active context + fetcher.Request(rctx) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } +} + +// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request +func TestFetcherCancelStopsRequest(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // we start the fetcher with an active context + go fetcher.run(ctx, peersToSkip) + + rctx, rcancel := context.WithCancel(context.Background()) + rcancel() + + // we call Request with a cancelled context + fetcher.Request(rctx) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetch function initiated request") + case <-time.After(200 * time.Millisecond): + } + + // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled + rctx = context.Background() + fetcher.Request(rctx) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("expected request") + } +} + +// TestOfferUsesSource tests Fetcher Offer behavior. +// In this case there should be 1 (and only one) request initiated from the source peer, and the +// source nodeid should appear in the peersToSkip map. +func TestFetcherOfferUsesSource(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + // call the Offer function with the source peer + fetcher.Offer(rctx, &sourcePeerID) + + // fetcher should not initiate request + select { + case <-requester.requestC: + t.Fatalf("fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } + + // call Request after the Offer + rctx = context.Background() + fetcher.Request(rctx) + + // there should be exactly 1 request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + select { + case <-requester.requestC: + t.Fatalf("Fetcher number of requests expected 1 got 2") + case <-time.After(200 * time.Millisecond): + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + // call Request first + rctx := context.Background() + fetcher.Request(rctx) + + // there should be a request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if request.Source != nil { + t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // after the Request call Offer + fetcher.Offer(context.Background(), &sourcePeerID) + + // there should be a request coming from fetcher + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed +func TestFetcherRetryOnTimeout(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // set searchTimeOut to low value so the test is quicker + defer func(t time.Duration) { + searchTimeout = t + }(searchTimeout) + searchTimeout = 250 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + // call the fetch function with an active context + rctx := context.Background() + fetcher.Request(rctx) + + // after 100ms the first request should be initiated + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not initiate request") + } + + // after another 100ms no new request should be initiated, because search timeout is 250ms + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + t.Fatalf("unexpected request from fetcher") + default: + } + + // after another 300ms search timeout is over, there should be a new request + time.Sleep(300 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not retry request") + } +} + +// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts +// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if +// a request is initiated when the fetch function is called +func TestFetcherFactory(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcherFactory := NewFetcherFactory(requester.doRequest, false) + + peersToSkip := &sync.Map{} + + fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip) + + fetcher.Request(context.Background()) + + // check if the created fetchFunction really starts a fetcher and initiates a request + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } + +} + +func TestFetcherRequestQuitRetriesRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + // make sure searchTimeout is long so it is sure the request is not retried because of timeout + defer func(t time.Duration) { + searchTimeout = t + }(searchTimeout) + searchTimeout = 10 * time.Second + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + fetcher.Request(rctx) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated") + } + + close(requester.quitC) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated after failed request") + } +} + +// TestRequestSkipPeer checks if PeerSkip function will skip provided peer +// and not skip unknown one. +func TestRequestSkipPeer(t *testing.T) { + addr := make([]byte, 32) + peers := []discover.NodeID{ + discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + } + + peersToSkip := new(sync.Map) + peersToSkip.Store(peers[0].String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peers[0].String()) { + t.Errorf("peer not skipped") + } + + if r.SkipPeer(peers[1].String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerExpired checks if a peer to skip is not skipped +// after RequestTimeout has passed. +func TestRequestSkipPeerExpired(t *testing.T) { + addr := make([]byte, 32) + peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if r.SkipPeer(peer.String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped +// after RequestTimeout is not skipped if it is set for a permanent skipping +// by value to peersToSkip map is not time.Duration. +func TestRequestSkipPeerPermanent(t *testing.T) { + addr := make([]byte, 32) + peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), true) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } +} diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go index fab638c9e..538502605 100644 --- a/swarm/network/priorityqueue/priorityqueue.go +++ b/swarm/network/priorityqueue/priorityqueue.go @@ -28,10 +28,13 @@ package priorityqueue import ( "context" "errors" + + "github.com/ethereum/go-ethereum/log" ) var ( - errContention = errors.New("queue contention") + ErrContention = errors.New("contention") + errBadPriority = errors.New("bad priority") wakey = struct{}{} @@ -39,7 +42,7 @@ var ( // PriorityQueue is the basic structure type PriorityQueue struct { - queues []chan interface{} + Queues []chan interface{} wakeup chan struct{} } @@ -50,27 +53,29 @@ func New(n int, l int) *PriorityQueue { queues[i] = make(chan interface{}, l) } return &PriorityQueue{ - queues: queues, + Queues: queues, wakeup: make(chan struct{}, 1), } } // Run is a forever loop popping items from the queues func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) { - top := len(pq.queues) - 1 + top := len(pq.Queues) - 1 p := top READ: for { - q := pq.queues[p] + q := pq.Queues[p] select { case <-ctx.Done(): return case x := <-q: + log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p])) f(x) p = top default: if p > 0 { p-- + log.Trace("priority.queue p > 0", "p", p) continue READ } p = top @@ -78,6 +83,7 @@ READ: case <-ctx.Done(): return case <-pq.wakeup: + log.Trace("priority.queue wakeup", "p", p) } } } @@ -85,23 +91,15 @@ READ: // Push pushes an item to the appropriate queue specified in the priority argument // if context is given it waits until either the item is pushed or the Context aborts -// otherwise returns errContention if the queue is full -func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error { - if p < 0 || p >= len(pq.queues) { +func (pq *PriorityQueue) Push(x interface{}, p int) error { + if p < 0 || p >= len(pq.Queues) { return errBadPriority } - if ctx == nil { - select { - case pq.queues[p] <- x: - default: - return errContention - } - } else { - select { - case pq.queues[p] <- x: - case <-ctx.Done(): - return ctx.Err() - } + log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p])) + select { + case pq.Queues[p] <- x: + default: + return ErrContention } select { case pq.wakeup <- wakey: diff --git a/swarm/network/priorityqueue/priorityqueue_test.go b/swarm/network/priorityqueue/priorityqueue_test.go index cd54250f8..ed8b575c2 100644 --- a/swarm/network/priorityqueue/priorityqueue_test.go +++ b/swarm/network/priorityqueue/priorityqueue_test.go @@ -30,7 +30,7 @@ func TestPriorityQueue(t *testing.T) { results = append(results, v.(string)) wg.Done() }) - pq.Push(context.Background(), "2.0", 2) + pq.Push("2.0", 2) wg.Wait() if results[0] != "2.0" { t.Errorf("expected first result %q, got %q", "2.0", results[0]) @@ -66,7 +66,7 @@ Loop: { priorities: []int{0, 0, 0}, values: []string{"0.0", "0.0", "0.1"}, - errors: []error{nil, nil, errContention}, + errors: []error{nil, nil, ErrContention}, }, } { var results []string @@ -74,7 +74,7 @@ Loop: pq := New(3, 2) wg.Add(len(tc.values)) for j, value := range tc.values { - err := pq.Push(nil, value, tc.priorities[j]) + err := pq.Push(value, tc.priorities[j]) if tc.errors != nil && err != tc.errors[j] { t.Errorf("expected push error %v, got %v", tc.errors[j], err) continue Loop diff --git a/swarm/network/simulation/simulation.go b/swarm/network/simulation/simulation.go index 74f9d98ee..096f7322c 100644 --- a/swarm/network/simulation/simulation.go +++ b/swarm/network/simulation/simulation.go @@ -94,7 +94,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) { } s.Net = simulations.NewNetwork( - adapters.NewSimAdapter(adapterServices), + adapters.NewTCPAdapter(adapterServices), &simulations.NetworkConfig{ID: "0"}, ) @@ -164,17 +164,6 @@ var maxParallelCleanups = 10 func (s *Simulation) Close() { close(s.done) - // Close all connections before calling the Network Shutdown. - // It is possible that p2p.Server.Stop will block if there are - // existing connections. - for _, c := range s.Net.Conns { - if c.Up { - s.Net.Disconnect(c.One, c.Other) - } - } - s.shutdownWG.Wait() - s.Net.Shutdown() - sem := make(chan struct{}, maxParallelCleanups) s.mu.RLock() cleanupFuncs := make([]func(), len(s.cleanupFuncs)) @@ -206,6 +195,9 @@ func (s *Simulation) Close() { } close(s.runC) } + + s.shutdownWG.Wait() + s.Net.Shutdown() } // Done returns a channel that is closed when the simulation diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 491dc9fd5..e0d776e34 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora return nil, nil, nil, removeDataDir, err } - db := storage.NewDBAPI(localStore) - delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, nil, removeDataDir, err + } + + delivery := NewDelivery(to, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) teardown := func() { streamer.Close() removeDataDir() @@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } -func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) { +func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } -func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) { +func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - rrs.stores[idx].Put(ctx, chunk) + return rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 627352535..d0f27eebc 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -19,12 +19,11 @@ package stream import ( "context" "errors" - "time" - "github.com/ethereum/go-ethereum/common" + "fmt" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/discover" - cp "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -46,39 +45,34 @@ var ( ) type Delivery struct { - db *storage.DBAPI - kad *network.Kademlia - receiveC chan *ChunkDeliveryMsg - getPeer func(discover.NodeID) *Peer + chunkStore storage.SyncChunkStore + kad *network.Kademlia + getPeer func(discover.NodeID) *Peer } -func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery { - d := &Delivery{ - db: db, - kad: kad, - receiveC: make(chan *ChunkDeliveryMsg, deliveryCap), +func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { + return &Delivery{ + chunkStore: chunkStore, + kad: kad, } - - go d.processReceivedChunks() - return d } // SwarmChunkServer implements Server type SwarmChunkServer struct { deliveryC chan []byte batchC chan []byte - db *storage.DBAPI + chunkStore storage.ChunkStore currentLen uint64 quit chan struct{} } // NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer { +func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - db: db, - quit: make(chan struct{}), + deliveryC: make(chan []byte, deliveryCap), + batchC: make(chan []byte), + chunkStore: chunkStore, + quit: make(chan struct{}), } go s.processDeliveries() return s @@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() { // GetData retrives chunk data from db store func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) - if chunk.ReqC != nil { - if created { - if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { - log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) - chunk.SetErrored(storage.ErrChunkForward) - return nil - } + + var cancel func() + // TODO: do something with this hardcoded timeout, maybe use TTL in the future + ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) + + go func() { + select { + case <-ctx.Done(): + case <-streamer.quit: } - go func() { - var osp opentracing.Span - ctx, osp = spancontext.StartSpan( - ctx, - "waiting.delivery") - defer osp.Finish() - - t := time.NewTimer(10 * time.Minute) - defer t.Stop() - - log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created) - start := time.Now() - select { - case <-chunk.ReqC: - log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start)) - case <-t.C: - log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr) - chunk.SetErrored(storage.ErrChunkTimeout) - return - } - chunk.SetErrored(nil) - - if req.SkipCheck { - err := sp.Deliver(ctx, chunk, s.priority) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) - sp.Drop(err) - } + cancel() + }() + + go func() { + chunk, err := d.chunkStore.Get(ctx, req.Addr) + if err != nil { + log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) + return + } + if req.SkipCheck { + err = sp.Deliver(ctx, chunk, s.priority) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - streamer.deliveryC <- chunk.Addr[:] - }() - return nil - } - // TODO: call the retrieve function of the outgoing syncer - if req.SkipCheck { - log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr) - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) + return } - return sp.Deliver(ctx, chunk, s.priority) - } - streamer.deliveryC <- chunk.Addr[:] + select { + case streamer.deliveryC <- chunk.Address()[:]: + case <-streamer.quit: + } + + }() + return nil } @@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } +// TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( @@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch "chunk.delivery") defer osp.Finish() - req.peer = sp - d.receiveC <- req - return nil -} + processReceivedChunksCount.Inc(1) -func (d *Delivery) processReceivedChunks() { -R: - for req := range d.receiveC { - processReceivedChunksCount.Inc(1) - - if len(req.SData) > cp.DefaultSize+8 { - log.Warn("received chunk is bigger than expected", "len", len(req.SData)) - continue R - } - - // this should be has locally - chunk, err := d.db.Get(context.TODO(), req.Addr) - if err == nil { - continue R - } - if err != storage.ErrFetching { - log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk) - continue R - } - select { - case <-chunk.ReqC: - log.Error("someone else delivered?", "hash", chunk.Addr.Hex()) - continue R - default: - } - - chunk.SData = req.SData - d.db.Put(context.TODO(), chunk) - - go func(req *ChunkDeliveryMsg) { - err := chunk.WaitToStore() + go func() { + req.peer = sp + err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + if err != nil { if err == storage.ErrChunkInvalid { + // we removed this log because it spams the logs + // TODO: Enable this log line + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) req.peer.Drop(err) } - }(req) - } + } + }() + return nil } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { - var success bool - var err error +func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { requestFromPeersCount.Inc(1) + var sp *Peer + spID := req.Source - d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool { - spId := p.ID() - for _, p := range peersToSkip { - if p == spId { - log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId) + if spID != nil { + sp = d.getPeer(*spID) + if sp == nil { + return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) + } + } else { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { + id := p.ID() + // TODO: skip light nodes that do not accept retrieve requests + if req.SkipPeer(id.String()) { + log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true } - } - sp := d.getPeer(spId) + sp = d.getPeer(id) + if sp == nil { + log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) + return true + } + spID = &id + return false + }) if sp == nil { - log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) - return true + return nil, nil, errors.New("no peer found") } - err = sp.SendPriority(ctx, &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: skipCheck, - }, Top) - if err != nil { - return true - } - requestFromPeersEachCount.Inc(1) - success = true - return false - }) - if success { - return nil } - return errors.New("no peer found") + + err := sp.SendPriority(ctx, &RetrieveRequestMsg{ + Addr: req.Addr, + SkipCheck: req.SkipCheck, + }, Top) + if err != nil { + return nil, nil, err + } + requestFromPeersEachCount.Inc(1) + + return spID, sp.quit, nil } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 972cc859a..ece54d4ee 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) + ctx := context.Background() + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + streamer.delivery.RequestFromPeers(ctx, req) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { Code: 5, Msg: &RetrieveRequestMsg{ - Addr: chunk.Addr[:], + Addr: chunk.Address()[:], }, Peer: peerID, }, @@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }) hash := storage.Address(hash0[:]) - chunk := storage.NewChunk(hash, nil) - chunk.SData = hash - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk := storage.NewChunk(hash, hash) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } hash = storage.Address(hash1[:]) - chunk = storage.NewChunk(hash, nil) - chunk.SData = hash1[:] - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk = storage.NewChunk(hash, hash1[:]) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) - - if !created { - t.Fatal("chunk already exists") - } - select { - case <-chunk.ReqC: - t.Fatal("chunk is already received") - default: - } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { }, }, p2ptest.Exchange{ - Label: "ChunkDeliveryRequest message", + Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ { Code: 6, @@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { if err != nil { t.Fatalf("Expected no error, got %v", err) } - - timeout := time.NewTimer(1 * time.Second) - - select { - case <-timeout.C: - t.Fatal("timeout receiving chunk") - case <-chunk.ReqC: + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // wait for the chunk to get stored + storedChunk, err := localStore.Get(ctx, chunkKey) + for err != nil { + select { + case <-ctx.Done(): + t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) + default: + } + storedChunk, err = localStore.Get(ctx, chunkKey) + time.Sleep(50 * time.Millisecond) } - storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } - if !bytes.Equal(storedChunk.SData, chunkData) { + if !bytes.Equal(storedChunk.Data(), chunkData) { t.Fatal("Retrieved chunk has different data than original") } @@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) @@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - id := ctx.Config.ID addr := network.NewAddrFromNodeID(id) store, datadir, err := createTestLocalStorageForID(id, addr) @@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, DoSync: true, SyncUpdateDelay: 0, }) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index f4294134b..452aaca76 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -38,13 +38,18 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) -func TestIntervals(t *testing.T) { +func TestIntervalsLive(t *testing.T) { testIntervals(t, true, nil, false) - testIntervals(t, false, NewRange(9, 26), false) - testIntervals(t, true, NewRange(9, 26), false) - testIntervals(t, true, nil, true) +} + +func TestIntervalsHistory(t *testing.T) { + testIntervals(t, false, NewRange(9, 26), false) testIntervals(t, false, NewRange(9, 26), true) +} + +func TestIntervalsLiveAndHistory(t *testing.T) { + testIntervals(t, true, NewRange(9, 26), false) testIntervals(t, true, NewRange(9, 26), true) } @@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { os.RemoveAll(datadir) } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { - return newTestExternalClient(db), nil + return newTestExternalClient(netStore), nil }) r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil @@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() storer := nodeIDs[0] @@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { liveErrC := make(chan error) historyErrC := make(chan error) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - log.Error("WaitKademlia error: %v", "err", err) - return err - } - log.Debug("Watching for disconnections") disconnections := sim.PeerEvents( context.Background(), @@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), ) + err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) + if err != nil { + return err + } + go func() { for d := range disconnections { if d.Error != nil { @@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var liveHashesChan chan []byte liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - log.Error("Subscription error: %v", "err", err) + log.Error("get hashes", "err", err) return } i := externalStreamSessionAt @@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var historyHashesChan chan []byte historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false)) if err != nil { + log.Error("get hashes", "err", err) return } @@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } }() - err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) - if err != nil { - return err - } if err := <-liveErrC; err != nil { return err } @@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error { type testExternalClient struct { hashes chan []byte - db *storage.DBAPI + store storage.SyncChunkStore enableNotificationsC chan struct{} } -func newTestExternalClient(db *storage.DBAPI) *testExternalClient { +func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - db: db, + store: store, enableNotificationsC: make(chan struct{}), } } -func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(ctx, hash) - if chunk.ReqC == nil { +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { + wait := c.store.FetchFunc(ctx, storage.Address(hash)) + if wait == nil { return nil } - c.hashes <- hash - //NOTE: This was failing on go1.9.x with a deadlock. - //Sometimes this function would just block - //It is commented now, but it may be well worth after the chunk refactor - //to re-enable this and see if the problem has been addressed - /* - return func() { - return chunk.WaitToStore() + select { + case c.hashes <- hash: + case <-ctx.Done(): + log.Warn("testExternalClient NeedData context", "err", ctx.Err()) + return func(_ context.Context) error { + return ctx.Err() } - */ - return nil + } + return wait } func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index a19f63589..2e1a81e82 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -18,9 +18,7 @@ package stream import ( "context" - "errors" "fmt" - "sync" "time" "github.com/ethereum/go-ethereum/metrics" @@ -31,6 +29,8 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) +var syncBatchTimeout = 30 * time.Second + // Stream defines a unique stream identifier. type Stream struct { // Name is used for Client and Server functions identification. @@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e go func() { if err := p.SendOfferedHashes(os, from, to); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() @@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e } go func() { if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() } @@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if err != nil { return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err) } - wg := sync.WaitGroup{} + + ctr := 0 + errC := make(chan error) + ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) + + ctx = context.WithValue(ctx, "source", p.ID().String()) for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] if wait := c.NeedData(ctx, hash); wait != nil { + ctr++ want.Set(i/HashSize, true) - wg.Add(1) // create request and wait until the chunk data arrives and is stored - go func(w func()) { - w() - wg.Done() + go func(w func(context.Context) error) { + select { + case errC <- w(ctx): + case <-ctx.Done(): + } }(wait) } } - // done := make(chan bool) - // go func() { - // wg.Wait() - // close(done) - // }() - // go func() { - // select { - // case <-done: - // s.next <- s.batchDone(p, req, hashes) - // case <-time.After(1 * time.Second): - // p.Drop(errors.New("timeout waiting for batch to be delivered")) - // } - // }() + go func() { - wg.Wait() + defer cancel() + for i := 0; i < ctr; i++ { + select { + case err := <-errC: + if err != nil { + log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + return + } + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + } + } select { case c.next <- c.batchDone(p, req, hashes): case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) } }() // only send wantedKeysMsg if all missing chunks of the previous batch arrived @@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg c.sessionAt = req.From } from, to := c.nextBatch(req.To + 1) - log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) + log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID()) if from == to { return nil } @@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg To: to, } go func() { + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) select { - case <-time.After(120 * time.Second): - log.Warn("handleOfferedHashesMsg timeout, so dropping peer") - p.Drop(errors.New("handle offered hashes timeout")) - return case err := <-c.next: if err != nil { - log.Warn("c.next dropping peer", "err", err) + log.Warn("c.next error dropping peer", "err", err) p.Drop(err) return } case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) err := p.SendPriority(ctx, msg, c.priority) if err != nil { - log.Warn("SendPriority err, so dropping peer", "err", err) - p.Drop(err) + log.Warn("SendPriority error", "err", err) } }() return nil @@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) // launch in go routine since GetBatch blocks until new hashes arrive go func() { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "err", err) } }() // go p.SendOfferedHashes(s, req.From, req.To) @@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } - chunk := storage.NewChunk(hash, nil) - chunk.SData = data - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) - } + chunk := storage.NewChunk(hash, data) if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 80b9ab711..1466a7a9c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -33,8 +33,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) -var sendTimeout = 30 * time.Second - type notFoundError struct { t string s Stream @@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - p.Send(wmsg.Context, wmsg.Msg) + err := p.Send(wmsg.Context, wmsg.Msg) + if err != nil { + log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + } }) + + // basic monitoring for pq contention + go func(pq *pq.PriorityQueue) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + var len_maxi int + var cap_maxi int + for k := range pq.Queues { + if len_maxi < len(pq.Queues[k]) { + len_maxi = len(pq.Queues[k]) + } + + if cap_maxi < cap(pq.Queues[k]) { + cap_maxi = cap(pq.Queues[k]) + } + } + + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi)) + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi)) + case <-p.quit: + return + } + } + }(p.pq) + go func() { <-p.quit cancel() @@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error { var sp opentracing.Span ctx, sp = spancontext.StartSpan( ctx, @@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 defer sp.Finish() msg := &ChunkDeliveryMsg{ - Addr: chunk.Addr, - SData: chunk.SData, + Addr: chunk.Address(), + SData: chunk.Data(), } return p.SendPriority(ctx, msg, priority) } @@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) - defer cancel() wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, } - return p.pq.Push(cctx, wmsg, int(priority)) + err := p.pq.Push(wmsg, int(priority)) + if err == pq.ErrContention { + log.Warn("dropping peer on priority queue contention", "peer", p.ID()) + p.Drop(err) + } + return err } // SendOfferedHashes sends OfferedHashesMsg protocol msg diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 4ff947b21..19eaad34e 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 0, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucketKeyFileStore = simulation.BucketKey("filestore") bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 313019d6a..7cd09099c 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "runtime" "sync" "testing" "time" @@ -39,15 +40,20 @@ import ( mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) -const testMinProxBinSize = 2 const MaxTimeout = 600 type synctestConfig struct { - addrs [][]byte - hashes []storage.Address - idToChunksMap map[discover.NodeID][]int - chunksToNodesMap map[string][]int - addrToIDMap map[string]discover.NodeID + addrs [][]byte + hashes []storage.Address + idToChunksMap map[discover.NodeID][]int + //chunksToNodesMap map[string][]int + addrToIDMap map[string]discover.NodeID +} + +// Tests in this file should not request chunks from peers. +// This function will panic indicating that there is a problem if request has been made. +func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { + panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String())) } //This test is a syncing test for nodes. @@ -58,6 +64,9 @@ type synctestConfig struct { //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { @@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) - err := testSyncingViaDirectSubscribe(*chunks, *nodes) + err := testSyncingViaDirectSubscribe(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { for _, chnk := range chnkCnt { for _, n := range nodeCnt { log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) - err := testSyncingViaDirectSubscribe(chnk, n) + err := testSyncingViaDirectSubscribe(t, chnk, n) if err != nil { t.Fatal(err) } @@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { t.Fatal(err) } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. allSuccess := false @@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { }() } for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if result.Error != nil { t.Fatal(result.Error) } + log.Info("Simulation ended") } /* @@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. */ -func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { +func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { @@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, }) defer sim.Close() - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() conf := &synctestConfig{} @@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return err } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { // or until the timeout is reached. allSuccess := false for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return result.Error } - log.Info("Simulation terminated") + log.Info("Simulation ended") return nil } @@ -462,10 +527,9 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //iterate over each bin and solicit needed subscription to bins kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to - histRange := &Range{} subCnt++ - err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top) + err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High) if err != nil { log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err)) return false @@ -478,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //map chunk keys to addresses which are responsible func mapKeysToNodes(conf *synctestConfig) { - kmap := make(map[string][]int) nodemap := make(map[string][]int) //build a pot for chunk hashes np := pot.NewPot(nil, 0) @@ -487,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) { indexmap[string(a)] = i np, _, _ = pot.Add(np, a, pof) } + + var kadMinProxSize = 2 + + ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs) + //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes)) for i := 0; i < len(conf.hashes); i++ { - pl := 256 //highest possible proximity - var nns []int + var a []byte np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool { - a := val.([]byte) - if pl < 256 && pl != po { - return false - } - if pl == 256 || pl == po { - log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)])) - nns = append(nns, indexmap[string(a)]) - nodemap[string(a)] = append(nodemap[string(a)], i) - } - if pl == 256 && len(nns) >= testMinProxBinSize { - //maxProxBinSize has been reached at this po, so save it - //we will add all other nodes at the same po - pl = po - } - return true + // take the first address + a = val.([]byte) + return false }) - kmap[string(conf.hashes[i])] = nns + + nns := ppmap[common.Bytes2Hex(a)].NNSet + nns = append(nns, a) + + for _, p := range nns { + nodemap[string(p)] = append(nodemap[string(p)], i) + } } for addr, chunks := range nodemap { //this selects which chunks are expected to be found with the given node conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks } log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap)) - conf.chunksToNodesMap = kmap } //upload a file(chunks) to a single local node store diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index deffdfc3f..1f1f34b7b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,10 +32,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -43,8 +41,8 @@ const ( Mid High Top - PriorityQueue // number of queues - PriorityQueueCap = 32 // queue capacity + PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top + PriorityQueueCap = 128 // queue capacity HashSize = 32 ) @@ -73,7 +71,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } @@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) { - return NewSwarmChunkServer(delivery.db), nil + return NewSwarmChunkServer(delivery.chunkStore), nil }) streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live)) + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) - RegisterSwarmSyncerServer(streamer, db) - RegisterSwarmSyncerClient(streamer, db) + RegisterSwarmSyncerServer(streamer, syncChunkStore) + RegisterSwarmSyncerClient(streamer, syncChunkStore) if options.DoSync { // latestIntC function ensures that @@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "registry.retrieve") - defer sp.Finish() - - return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) -} - func (r *Registry) NodeInfo() interface{} { return nil } @@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData(context.Context, []byte) func() + NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 7523860c9..06e96b9a9 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -80,15 +80,17 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(ctx context.Context, hash []byte) func() { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { - return func() { + return func(context.Context) error { <-self.wait0 + return nil } } else if bytes.Equal(hash, hash2[:]) { - return func() { + return func(context.Context) error { <-self.wait2 + return nil } } return nil diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index d7febe4a3..e9811a678 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -28,7 +28,6 @@ import ( ) const ( - // BatchSize = 2 BatchSize = 128 ) @@ -38,35 +37,37 @@ const ( // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 - db *storage.DBAPI + store storage.SyncChunkStore sessionAt uint64 start uint64 + live bool quit chan struct{} } // NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { - sessionAt := db.CurrentBucketStorageIndex(po) +func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { + sessionAt := syncChunkStore.BinIndex(po) var start uint64 if live { start = sessionAt } return &SwarmSyncerServer{ po: po, - db: db, + store: syncChunkStore, sessionAt: sessionAt, start: start, + live: live, quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, db) + return NewSwarmSyncerServer(live, po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() { close(s.quit) } -// GetSection retrieves the actual chunk from localstore +// GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.store.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 + if s.live { + if from == 0 { + from = s.start + } + if to <= from || from >= s.sessionAt { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionAt { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionAt { + to = s.sessionAt + } } + var ticker *time.Ticker defer func() { if ticker != nil { @@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 } metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { - batch = append(batch, addr[:]...) + err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { + batch = append(batch, key[:]...) i++ to = idx return i < BatchSize @@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 wait = true } - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) + log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) return batch, from, to, nil, nil } @@ -146,28 +155,26 @@ type SwarmSyncerClient struct { sessionReader storage.LazySectionReader retrieveC chan *storage.Chunk storeC chan *storage.Chunk - db *storage.DBAPI + store storage.SyncChunkStore // chunker storage.Chunker - currentRoot storage.Address - requestFunc func(chunk *storage.Chunk) - end, start uint64 - peer *Peer - ignoreExistingRequest bool - stream Stream + currentRoot storage.Address + requestFunc func(chunk *storage.Chunk) + end, start uint64 + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - db: db, - peer: p, - ignoreExistingRequest: ignoreExistingRequest, - stream: stream, + store: store, + peer: p, + stream: stream, }, nil } // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { +// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { // retrieveC := make(storage.Chunk, chunksCap) // RunChunkRequestor(p, retrieveC) // storeC := make(storage.Chunk, chunksCap) @@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) } // NeedData -func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(ctx, key) - // TODO: we may want to request from this peer anyway even if the request exists - - // ignoreExistingRequest is temporary commented out until its functionality is verified. - // For now, this optimization can be disabled. - if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { - return nil - } - // create request and wait until the chunk data arrives and is stored - return func() { - chunk.WaitToStore() - } +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { + return s.store.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f72aa3444..469d520f8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) - bucket.Store(bucketKeyDB, db) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyDB, netStore) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + bucket.Store(bucketKeyDelivery, delivery) - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) return r, cleanup, nil @@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { + netStore := item.(*storage.NetStore) + netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { hashes[i] = append(hashes[i], addr) totalHashes++ hashCounts[i]++ @@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - chunk, err := db.Get(ctx, key) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { - continue + db := item.(*storage.NetStore) + _, err := db.Get(ctx, key) + if err == nil { + found++ } - // needed for leveldb not to be closed? - // chunk.WaitToStore() - found++ } } log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) |