aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go739
1 files changed, 739 insertions, 0 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
new file mode 100644
index 000000000..9b4658c51
--- /dev/null
+++ b/swarm/network/stream/stream.go
@@ -0,0 +1,739 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package stream
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
+ "github.com/ethereum/go-ethereum/swarm/pot"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const (
+ Low uint8 = iota
+ Mid
+ High
+ Top
+ PriorityQueue // number of queues
+ PriorityQueueCap = 32 // queue capacity
+ HashSize = 32
+)
+
+// Registry registry for outgoing and incoming streamer constructors
+type Registry struct {
+ api *API
+ addr *network.BzzAddr
+ skipCheck bool
+ clientMu sync.RWMutex
+ serverMu sync.RWMutex
+ peersMu sync.RWMutex
+ serverFuncs map[string]func(*Peer, string, bool) (Server, error)
+ clientFuncs map[string]func(*Peer, string, bool) (Client, error)
+ peers map[discover.NodeID]*Peer
+ delivery *Delivery
+ intervalsStore state.Store
+ doRetrieve bool
+}
+
+// RegistryOptions holds optional values for NewRegistry constructor.
+type RegistryOptions struct {
+ SkipCheck bool
+ DoSync bool
+ DoRetrieve bool
+ SyncUpdateDelay time.Duration
+}
+
+// NewRegistry is Streamer constructor
+func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
+ if options == nil {
+ options = &RegistryOptions{}
+ }
+ if options.SyncUpdateDelay <= 0 {
+ options.SyncUpdateDelay = 15 * time.Second
+ }
+ streamer := &Registry{
+ addr: addr,
+ skipCheck: options.SkipCheck,
+ serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
+ clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
+ peers: make(map[discover.NodeID]*Peer),
+ delivery: delivery,
+ intervalsStore: intervalsStore,
+ doRetrieve: options.DoRetrieve,
+ }
+ streamer.api = NewAPI(streamer)
+ delivery.getPeer = streamer.getPeer
+ streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
+ return NewSwarmChunkServer(delivery.db), nil
+ })
+ streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
+ return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
+ })
+ RegisterSwarmSyncerServer(streamer, db)
+ RegisterSwarmSyncerClient(streamer, db)
+
+ if options.DoSync {
+ // latestIntC function ensures that
+ // - receiving from the in chan is not blocked by processing inside the for loop
+ // - the latest int value is delivered to the loop after the processing is done
+ // In context of NeighbourhoodDepthC:
+ // after the syncing is done updating inside the loop, we do not need to update on the intermediate
+ // depth changes, only to the latest one
+ latestIntC := func(in <-chan int) <-chan int {
+ out := make(chan int, 1)
+
+ go func() {
+ defer close(out)
+
+ for i := range in {
+ select {
+ case <-out:
+ default:
+ }
+ out <- i
+ }
+ }()
+
+ return out
+ }
+
+ go func() {
+ // wait for kademlia table to be healthy
+ time.Sleep(options.SyncUpdateDelay)
+
+ kad := streamer.delivery.overlay.(*network.Kademlia)
+ depthC := latestIntC(kad.NeighbourhoodDepthC())
+ addressBookSizeC := latestIntC(kad.AddrCountC())
+
+ // initial requests for syncing subscription to peers
+ streamer.updateSyncing()
+
+ for depth := range depthC {
+ log.Debug("Kademlia neighbourhood depth change", "depth", depth)
+
+ // Prevent too early sync subscriptions by waiting until there are no
+ // new peers connecting. Sync streams updating will be done after no
+ // peers are connected for at least SyncUpdateDelay period.
+ timer := time.NewTimer(options.SyncUpdateDelay)
+ // Hard limit to sync update delay, preventing long delays
+ // on a very dynamic network
+ maxTimer := time.NewTimer(3 * time.Minute)
+ loop:
+ for {
+ select {
+ case <-maxTimer.C:
+ // force syncing update when a hard timeout is reached
+ log.Trace("Sync subscriptions update on hard timeout")
+ // request for syncing subscription to new peers
+ streamer.updateSyncing()
+ break loop
+ case <-timer.C:
+ // start syncing as no new peers has been added to kademlia
+ // for some time
+ log.Trace("Sync subscriptions update")
+ // request for syncing subscription to new peers
+ streamer.updateSyncing()
+ break loop
+ case size := <-addressBookSizeC:
+ log.Trace("Kademlia address book size changed on depth change", "size", size)
+ // new peers has been added to kademlia,
+ // reset the timer to prevent early sync subscriptions
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Reset(options.SyncUpdateDelay)
+ }
+ }
+ timer.Stop()
+ maxTimer.Stop()
+ }
+ }()
+ }
+
+ return streamer
+}
+
+// RegisterClient registers an incoming streamer constructor
+func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
+ r.clientMu.Lock()
+ defer r.clientMu.Unlock()
+
+ r.clientFuncs[stream] = f
+}
+
+// RegisterServer registers an outgoing streamer constructor
+func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error)) {
+ r.serverMu.Lock()
+ defer r.serverMu.Unlock()
+
+ r.serverFuncs[stream] = f
+}
+
+// GetClient accessor for incoming streamer constructors
+func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error) {
+ r.clientMu.RLock()
+ defer r.clientMu.RUnlock()
+
+ f := r.clientFuncs[stream]
+ if f == nil {
+ return nil, fmt.Errorf("stream %v not registered", stream)
+ }
+ return f, nil
+}
+
+// GetServer accessor for incoming streamer constructors
+func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error) {
+ r.serverMu.RLock()
+ defer r.serverMu.RUnlock()
+
+ f := r.serverFuncs[stream]
+ if f == nil {
+ return nil, fmt.Errorf("stream %v not registered", stream)
+ }
+ return f, nil
+}
+
+func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Range, prio uint8) error {
+ // check if the stream is registered
+ if _, err := r.GetServerFunc(s.Name); err != nil {
+ return err
+ }
+
+ peer := r.getPeer(peerId)
+ if peer == nil {
+ return fmt.Errorf("peer not found %v", peerId)
+ }
+
+ if _, err := peer.getServer(s); err != nil {
+ if e, ok := err.(*notFoundError); ok && e.t == "server" {
+ // request subscription only if the server for this stream is not created
+ log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h)
+ return peer.Send(&RequestSubscriptionMsg{
+ Stream: s,
+ History: h,
+ Priority: prio,
+ })
+ }
+ return err
+ }
+ log.Trace("RequestSubscription: already subscribed", "peer", peerId, "stream", s, "history", h)
+ return nil
+}
+
+// Subscribe initiates the streamer
+func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priority uint8) error {
+ // check if the stream is registered
+ if _, err := r.GetClientFunc(s.Name); err != nil {
+ return err
+ }
+
+ peer := r.getPeer(peerId)
+ if peer == nil {
+ return fmt.Errorf("peer not found %v", peerId)
+ }
+
+ var to uint64
+ if !s.Live && h != nil {
+ to = h.To
+ }
+
+ err := peer.setClientParams(s, newClientParams(priority, to))
+ if err != nil {
+ return err
+ }
+
+ if s.Live && h != nil {
+ if err := peer.setClientParams(
+ getHistoryStream(s),
+ newClientParams(getHistoryPriority(priority), h.To),
+ ); err != nil {
+ return err
+ }
+ }
+
+ msg := &SubscribeMsg{
+ Stream: s,
+ History: h,
+ Priority: priority,
+ }
+ log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
+
+ return peer.SendPriority(msg, priority)
+}
+
+func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error {
+ peer := r.getPeer(peerId)
+ if peer == nil {
+ return fmt.Errorf("peer not found %v", peerId)
+ }
+
+ msg := &UnsubscribeMsg{
+ Stream: s,
+ }
+ log.Debug("Unsubscribe ", "peer", peerId, "stream", s)
+
+ if err := peer.Send(msg); err != nil {
+ return err
+ }
+ return peer.removeClient(s)
+}
+
+// Quit sends the QuitMsg to the peer to remove the
+// stream peer client and terminate the streaming.
+func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
+ peer := r.getPeer(peerId)
+ if peer == nil {
+ log.Debug("stream quit: peer not found", "peer", peerId, "stream", s)
+ // if the peer is not found, abort the request
+ return nil
+ }
+
+ msg := &QuitMsg{
+ Stream: s,
+ }
+ log.Debug("Quit ", "peer", peerId, "stream", s)
+
+ return peer.Send(msg)
+}
+
+func (r *Registry) Retrieve(chunk *storage.Chunk) error {
+ return r.delivery.RequestFromPeers(chunk.Addr[:], r.skipCheck)
+}
+
+func (r *Registry) NodeInfo() interface{} {
+ return nil
+}
+
+func (r *Registry) PeerInfo(id discover.NodeID) interface{} {
+ return nil
+}
+
+func (r *Registry) Close() error {
+ return r.intervalsStore.Close()
+}
+
+func (r *Registry) getPeer(peerId discover.NodeID) *Peer {
+ r.peersMu.RLock()
+ defer r.peersMu.RUnlock()
+
+ return r.peers[peerId]
+}
+
+func (r *Registry) setPeer(peer *Peer) {
+ r.peersMu.Lock()
+ r.peers[peer.ID()] = peer
+ metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
+ r.peersMu.Unlock()
+}
+
+func (r *Registry) deletePeer(peer *Peer) {
+ r.peersMu.Lock()
+ delete(r.peers, peer.ID())
+ metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
+ r.peersMu.Unlock()
+}
+
+func (r *Registry) peersCount() (c int) {
+ r.peersMu.Lock()
+ c = len(r.peers)
+ r.peersMu.Unlock()
+ return
+}
+
+// Run protocol run function
+func (r *Registry) Run(p *network.BzzPeer) error {
+ sp := NewPeer(p.Peer, r)
+ r.setPeer(sp)
+ defer r.deletePeer(sp)
+ defer close(sp.quit)
+ defer sp.close()
+
+ if r.doRetrieve {
+ err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", false), nil, Top)
+ if err != nil {
+ return err
+ }
+ }
+
+ return sp.Run(sp.HandleMsg)
+}
+
+// updateSyncing subscribes to SYNC streams by iterating over the
+// kademlia connections and bins. If there are existing SYNC streams
+// and they are no longer required after iteration, request to Quit
+// them will be send to appropriate peers.
+func (r *Registry) updateSyncing() {
+ // if overlay in not Kademlia, panic
+ kad := r.delivery.overlay.(*network.Kademlia)
+
+ // map of all SYNC streams for all peers
+ // used at the and of the function to remove servers
+ // that are not needed anymore
+ subs := make(map[discover.NodeID]map[Stream]struct{})
+ r.peersMu.RLock()
+ for id, peer := range r.peers {
+ peer.serverMu.RLock()
+ for stream := range peer.servers {
+ if stream.Name == "SYNC" {
+ if _, ok := subs[id]; !ok {
+ subs[id] = make(map[Stream]struct{})
+ }
+ subs[id][stream] = struct{}{}
+ }
+ }
+ peer.serverMu.RUnlock()
+ }
+ r.peersMu.RUnlock()
+
+ // request subscriptions for all nodes and bins
+ kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool {
+ p := conn.(network.Peer)
+ log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin))
+
+ // bin is always less then 256 and it is safe to convert it to type uint8
+ stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
+ if streams, ok := subs[p.ID()]; ok {
+ // delete live and history streams from the map, so that it won't be removed with a Quit request
+ delete(streams, stream)
+ delete(streams, getHistoryStream(stream))
+ }
+ err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
+ if err != nil {
+ log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
+ return false
+ }
+ return true
+ })
+
+ // remove SYNC servers that do not need to be subscribed
+ for id, streams := range subs {
+ if len(streams) == 0 {
+ continue
+ }
+ peer := r.getPeer(id)
+ if peer == nil {
+ continue
+ }
+ for stream := range streams {
+ log.Debug("Remove sync server", "peer", id, "stream", stream)
+ err := r.Quit(peer.ID(), stream)
+ if err != nil && err != p2p.ErrShuttingDown {
+ log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream)
+ }
+ }
+ }
+}
+
+func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := protocols.NewPeer(p, rw, Spec)
+ bzzPeer := network.NewBzzTestPeer(peer, r.addr)
+ r.delivery.overlay.On(bzzPeer)
+ defer r.delivery.overlay.Off(bzzPeer)
+ return r.Run(bzzPeer)
+}
+
+// HandleMsg is the message handler that delegates incoming messages
+func (p *Peer) HandleMsg(msg interface{}) error {
+ switch msg := msg.(type) {
+
+ case *SubscribeMsg:
+ return p.handleSubscribeMsg(msg)
+
+ case *SubscribeErrorMsg:
+ return p.handleSubscribeErrorMsg(msg)
+
+ case *UnsubscribeMsg:
+ return p.handleUnsubscribeMsg(msg)
+
+ case *OfferedHashesMsg:
+ return p.handleOfferedHashesMsg(msg)
+
+ case *TakeoverProofMsg:
+ return p.handleTakeoverProofMsg(msg)
+
+ case *WantedHashesMsg:
+ return p.handleWantedHashesMsg(msg)
+
+ case *ChunkDeliveryMsg:
+ return p.streamer.delivery.handleChunkDeliveryMsg(p, msg)
+
+ case *RetrieveRequestMsg:
+ return p.streamer.delivery.handleRetrieveRequestMsg(p, msg)
+
+ case *RequestSubscriptionMsg:
+ return p.handleRequestSubscription(msg)
+
+ case *QuitMsg:
+ return p.handleQuitMsg(msg)
+
+ default:
+ return fmt.Errorf("unknown message type: %T", msg)
+ }
+}
+
+type server struct {
+ Server
+ stream Stream
+ priority uint8
+ currentBatch []byte
+}
+
+// Server interface for outgoing peer Streamer
+type Server interface {
+ SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
+ GetData([]byte) ([]byte, error)
+ Close()
+}
+
+type client struct {
+ Client
+ stream Stream
+ priority uint8
+ sessionAt uint64
+ to uint64
+ next chan error
+ quit chan struct{}
+
+ intervalsKey string
+ intervalsStore state.Store
+}
+
+func peerStreamIntervalsKey(p *Peer, s Stream) string {
+ return p.ID().String() + s.String()
+}
+
+func (c client) AddInterval(start, end uint64) (err error) {
+ i := &intervals.Intervals{}
+ err = c.intervalsStore.Get(c.intervalsKey, i)
+ if err != nil {
+ return err
+ }
+ i.Add(start, end)
+ return c.intervalsStore.Put(c.intervalsKey, i)
+}
+
+func (c client) NextInterval() (start, end uint64, err error) {
+ i := &intervals.Intervals{}
+ err = c.intervalsStore.Get(c.intervalsKey, i)
+ if err != nil {
+ return 0, 0, err
+ }
+ start, end = i.Next()
+ return start, end, nil
+}
+
+// Client interface for incoming peer Streamer
+type Client interface {
+ NeedData([]byte) func()
+ BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
+ Close()
+}
+
+func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
+ if c.to > 0 && from >= c.to {
+ return 0, 0
+ }
+ if c.stream.Live {
+ return from, 0
+ } else if from >= c.sessionAt {
+ if c.to > 0 {
+ return from, c.to
+ }
+ return from, math.MaxUint64
+ }
+ nextFrom, nextTo, err := c.NextInterval()
+ if err != nil {
+ log.Error("next intervals", "stream", c.stream)
+ return
+ }
+ if nextTo > c.to {
+ nextTo = c.to
+ }
+ if nextTo == 0 {
+ nextTo = c.sessionAt
+ }
+ return
+}
+
+func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
+ if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
+ tp, err := tf()
+ if err != nil {
+ return err
+ }
+ if err := p.SendPriority(tp, c.priority); err != nil {
+ return err
+ }
+ if c.to > 0 && tp.Takeover.End >= c.to {
+ return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
+ }
+ return nil
+ }
+ // TODO: make a test case for testing if the interval is added when the batch is done
+ if err := c.AddInterval(req.From, req.To); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *client) close() {
+ select {
+ case <-c.quit:
+ default:
+ close(c.quit)
+ }
+ c.Close()
+}
+
+// clientParams store parameters for the new client
+// between a subscription and initial offered hashes request handling.
+type clientParams struct {
+ priority uint8
+ to uint64
+ // signal when the client is created
+ clientCreatedC chan struct{}
+}
+
+func newClientParams(priority uint8, to uint64) *clientParams {
+ return &clientParams{
+ priority: priority,
+ to: to,
+ clientCreatedC: make(chan struct{}),
+ }
+}
+
+func (c *clientParams) waitClient(ctx context.Context) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.clientCreatedC:
+ return nil
+ }
+}
+
+func (c *clientParams) clientCreated() {
+ close(c.clientCreatedC)
+}
+
+// Spec is the spec of the streamer protocol
+var Spec = &protocols.Spec{
+ Name: "stream",
+ Version: 4,
+ MaxMsgSize: 10 * 1024 * 1024,
+ Messages: []interface{}{
+ UnsubscribeMsg{},
+ OfferedHashesMsg{},
+ WantedHashesMsg{},
+ TakeoverProofMsg{},
+ SubscribeMsg{},
+ RetrieveRequestMsg{},
+ ChunkDeliveryMsg{},
+ SubscribeErrorMsg{},
+ RequestSubscriptionMsg{},
+ QuitMsg{},
+ },
+}
+
+func (r *Registry) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{
+ {
+ Name: Spec.Name,
+ Version: Spec.Version,
+ Length: Spec.Length(),
+ Run: r.runProtocol,
+ // NodeInfo: ,
+ // PeerInfo: ,
+ },
+ }
+}
+
+func (r *Registry) APIs() []rpc.API {
+ return []rpc.API{
+ {
+ Namespace: "stream",
+ Version: "3.0",
+ Service: r.api,
+ Public: true,
+ },
+ }
+}
+
+func (r *Registry) Start(server *p2p.Server) error {
+ log.Info("Streamer started")
+ return nil
+}
+
+func (r *Registry) Stop() error {
+ return nil
+}
+
+type Range struct {
+ From, To uint64
+}
+
+func NewRange(from, to uint64) *Range {
+ return &Range{
+ From: from,
+ To: to,
+ }
+}
+
+func (r *Range) String() string {
+ return fmt.Sprintf("%v-%v", r.From, r.To)
+}
+
+func getHistoryPriority(priority uint8) uint8 {
+ if priority == 0 {
+ return 0
+ }
+ return priority - 1
+}
+
+func getHistoryStream(s Stream) Stream {
+ return NewStream(s.Name, s.Key, false)
+}
+
+type API struct {
+ streamer *Registry
+}
+
+func NewAPI(r *Registry) *API {
+ return &API{
+ streamer: r,
+ }
+}
+
+func (api *API) SubscribeStream(peerId discover.NodeID, s Stream, history *Range, priority uint8) error {
+ return api.streamer.Subscribe(peerId, s, history, priority)
+}
+
+func (api *API) UnsubscribeStream(peerId discover.NodeID, s Stream) error {
+ return api.streamer.Unsubscribe(peerId, s)
+}