diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 319fc62c9..ea7cce8cb 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -25,7 +25,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/log" @@ -48,15 +48,15 @@ const ( // Registry registry for outgoing and incoming streamer constructors type Registry struct { + addr enode.ID api *API - addr *network.BzzAddr skipCheck bool clientMu sync.RWMutex serverMu sync.RWMutex peersMu sync.RWMutex serverFuncs map[string]func(*Peer, string, bool) (Server, error) clientFuncs map[string]func(*Peer, string, bool) (Client, error) - peers map[discover.NodeID]*Peer + peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store doRetrieve bool @@ -71,7 +71,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } @@ -79,11 +79,11 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore stora options.SyncUpdateDelay = 15 * time.Second } streamer := &Registry{ - addr: addr, + addr: localID, skipCheck: options.SkipCheck, serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), - peers: make(map[discover.NodeID]*Peer), + peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, doRetrieve: options.DoRetrieve, @@ -220,7 +220,7 @@ func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Serv return f, nil } -func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Range, prio uint8) error { +func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error { // check if the stream is registered if _, err := r.GetServerFunc(s.Name); err != nil { return err @@ -248,7 +248,7 @@ func (r *Registry) RequestSubscription(peerId discover.NodeID, s Stream, h *Rang } // Subscribe initiates the streamer -func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priority uint8) error { +func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error { // check if the stream is registered if _, err := r.GetClientFunc(s.Name); err != nil { return err @@ -288,7 +288,7 @@ func (r *Registry) Subscribe(peerId discover.NodeID, s Stream, h *Range, priorit return peer.SendPriority(context.TODO(), msg, priority) } -func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { +func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { peer := r.getPeer(peerId) if peer == nil { return fmt.Errorf("peer not found %v", peerId) @@ -307,7 +307,7 @@ func (r *Registry) Unsubscribe(peerId discover.NodeID, s Stream) error { // Quit sends the QuitMsg to the peer to remove the // stream peer client and terminate the streaming. -func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { +func (r *Registry) Quit(peerId enode.ID, s Stream) error { peer := r.getPeer(peerId) if peer == nil { log.Debug("stream quit: peer not found", "peer", peerId, "stream", s) @@ -327,7 +327,7 @@ func (r *Registry) NodeInfo() interface{} { return nil } -func (r *Registry) PeerInfo(id discover.NodeID) interface{} { +func (r *Registry) PeerInfo(id enode.ID) interface{} { return nil } @@ -335,7 +335,7 @@ func (r *Registry) Close() error { return r.intervalsStore.Close() } -func (r *Registry) getPeer(peerId discover.NodeID) *Peer { +func (r *Registry) getPeer(peerId enode.ID) *Peer { r.peersMu.RLock() defer r.peersMu.RUnlock() @@ -390,7 +390,7 @@ func (r *Registry) updateSyncing() { // map of all SYNC streams for all peers // used at the and of the function to remove servers // that are not needed anymore - subs := make(map[discover.NodeID]map[Stream]struct{}) + subs := make(map[enode.ID]map[Stream]struct{}) r.peersMu.RLock() for id, peer := range r.peers { peer.serverMu.RLock() @@ -407,8 +407,8 @@ func (r *Registry) updateSyncing() { r.peersMu.RUnlock() // request subscriptions for all nodes and bins - kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool { - log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin)) + kad.EachBin(r.addr[:], pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool { + log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin)) // bin is always less then 256 and it is safe to convert it to type uint8 stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true) @@ -446,7 +446,7 @@ func (r *Registry) updateSyncing() { func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := protocols.NewPeer(p, rw, Spec) - bp := network.NewBzzPeer(peer, r.addr) + bp := network.NewBzzPeer(peer) np := network.NewPeer(bp, r.delivery.kad) r.delivery.kad.On(np) defer r.delivery.kad.Off(np) @@ -724,10 +724,10 @@ func NewAPI(r *Registry) *API { } } -func (api *API) SubscribeStream(peerId discover.NodeID, s Stream, history *Range, priority uint8) error { +func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error { return api.streamer.Subscribe(peerId, s, history, priority) } -func (api *API) UnsubscribeStream(peerId discover.NodeID, s Stream) error { +func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { return api.streamer.Unsubscribe(peerId, s) } |