diff options
Diffstat (limited to 'swarm/network/fetcher.go')
-rw-r--r-- | swarm/network/fetcher.go | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go new file mode 100644 index 000000000..35e2f0132 --- /dev/null +++ b/swarm/network/fetcher.go @@ -0,0 +1,305 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +var searchTimeout = 1 * time.Second + +// Time to consider peer to be skipped. +// Also used in stream delivery. +var RequestTimeout = 10 * time.Second + +type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error) + +// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and +// keeps it alive until all active requests are completed. This can happen: +// 1. either because the chunk is delivered +// 2. or becuse the requestor cancelled/timed out +// Fetcher self destroys itself after it is completed. +// TODO: cancel all forward requests after termination +type Fetcher struct { + protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk + addr storage.Address // the address of the chunk to be fetched + offerC chan *discover.NodeID // channel of sources (peer node id strings) + requestC chan struct{} + skipCheck bool +} + +type Request struct { + Addr storage.Address // chunk address + Source *discover.NodeID // nodeID of peer to request from (can be nil) + SkipCheck bool // whether to offer the chunk first or deliver directly + peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil) +} + +// NewRequest returns a new instance of Request based on chunk address skip check and +// a map of peers to skip. +func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request { + return &Request{ + Addr: addr, + SkipCheck: skipCheck, + peersToSkip: peersToSkip, + } +} + +// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk. +// Peers to skip are kept per Request and for a time period of RequestTimeout. +// This function is used in stream package in Delivery.RequestFromPeers to optimize +// requests for chunks. +func (r *Request) SkipPeer(nodeID string) bool { + val, ok := r.peersToSkip.Load(nodeID) + if !ok { + return false + } + t, ok := val.(time.Time) + if ok && time.Now().After(t.Add(RequestTimeout)) { + // deadine expired + r.peersToSkip.Delete(nodeID) + return false + } + return true +} + +// FetcherFactory is initialised with a request function and can create fetchers +type FetcherFactory struct { + request RequestFunc + skipCheck bool +} + +// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory +func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { + return &FetcherFactory{ + request: request, + skipCheck: skipCheck, + } +} + +// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to +// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting +// this chunk, to make sure we don't request back the chunks from them. +// The created Fetcher is started and returned. +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(source, f.request, f.skipCheck) + go fetcher.run(ctx, peersToSkip) + return fetcher +} + +// NewFetcher creates a new Fetcher for the given chunk address using the given request function. +func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { + return &Fetcher{ + addr: addr, + protoRequestFunc: rf, + offerC: make(chan *discover.NodeID), + requestC: make(chan struct{}), + skipCheck: skipCheck, + } +} + +// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. +func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.offerC <- source: + case <-ctx.Done(): + } +} + +// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. +func (f *Fetcher) Request(ctx context.Context) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.requestC <- struct{}{}: + case <-ctx.Done(): + } +} + +// start prepares the Fetcher +// it keeps the Fetcher alive within the lifecycle of the passed context +func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { + var ( + doRequest bool // determines if retrieval is initiated in the current iteration + wait *time.Timer // timer for search timeout + waitC <-chan time.Time // timer channel + sources []*discover.NodeID // known sources, ie. peers that offered the chunk + requested bool // true if the chunk was actually requested + ) + gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected + + // loop that keeps the fetching process alive + // after every request a timer is set. If this goes off we request again from another peer + // note that the previous request is still alive and has the chance to deliver, so + // rerequesting extends the search. ie., + // if a peer we requested from is gone we issue a new request, so the number of active + // requests never decreases + for { + select { + + // incoming offer + case source := <-f.offerC: + log.Trace("new source", "peer addr", source, "request addr", f.addr) + // 1) the chunk is offered by a syncing peer + // add to known sources + sources = append(sources, source) + // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer) + doRequest = requested + + // incoming request + case <-f.requestC: + log.Trace("new request", "request addr", f.addr) + // 2) chunk is requested, set requested flag + // launch a request iff none been launched yet + doRequest = !requested + requested = true + + // peer we requested from is gone. fall back to another + // and remove the peer from the peers map + case id := <-gone: + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr) + peers.Delete(id.String()) + doRequest = requested + + // search timeout: too much time passed since the last request, + // extend the search to a new peer if we can find one + case <-waitC: + log.Trace("search timed out: rerequesting", "request addr", f.addr) + doRequest = requested + + // all Fetcher context closed, can quit + case <-ctx.Done(): + log.Trace("terminate fetcher", "request addr", f.addr) + // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from) + return + } + + // need to issue a new request + if doRequest { + var err error + sources, err = f.doRequest(ctx, gone, peers, sources) + if err != nil { + log.Warn("unable to request", "request addr", f.addr, "err", err) + } + } + + // if wait channel is not set, set it to a timer + if requested { + if wait == nil { + wait = time.NewTimer(searchTimeout) + defer wait.Stop() + waitC = wait.C + } else { + // stop the timer and drain the channel if it was not drained earlier + if !wait.Stop() { + select { + case <-wait.C: + default: + } + } + // reset the timer to go off after searchTimeout + wait.Reset(searchTimeout) + } + } + doRequest = false + } +} + +// doRequest attempts at finding a peer to request the chunk from +// * first it tries to request explicitly from peers that are known to have offered the chunk +// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address +// excluding those in the peersToSkip map +// * if no such peer is found an error is returned +// +// if a request is successful, +// * the peer's address is added to the set of peers to skip +// * the peer's address is removed from prospective sources, and +// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) +func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) { + var i int + var sourceID *discover.NodeID + var quit chan struct{} + + req := &Request{ + Addr: f.addr, + SkipCheck: f.skipCheck, + peersToSkip: peersToSkip, + } + + foundSource := false + // iterate over known sources + for i = 0; i < len(sources); i++ { + req.Source = sources[i] + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err == nil { + // remove the peer from known sources + // Note: we can modify the source although we are looping on it, because we break from the loop immediately + sources = append(sources[:i], sources[i+1:]...) + foundSource = true + break + } + } + + // if there are no known sources, or none available, we try request from a closest node + if !foundSource { + req.Source = nil + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err != nil { + // if no peers found to request from + return sources, err + } + } + // add peer to the set of peers to skip from now + peersToSkip.Store(sourceID.String(), time.Now()) + + // if the quit channel is closed, it indicates that the source peer we requested from + // disconnected or terminated its streamer + // here start a go routine that watches this channel and reports the source peer on the gone channel + // this go routine quits if the fetcher global context is done to prevent process leak + go func() { + select { + case <-quit: + gone <- sourceID + case <-ctx.Done(): + } + }() + return sources, nil +} |