diff options
Diffstat (limited to 'swarm/network/discovery.go')
-rw-r--r-- | swarm/network/discovery.go | 76 |
1 files changed, 34 insertions, 42 deletions
diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go index 55bf7c033..301959480 100644 --- a/swarm/network/discovery.go +++ b/swarm/network/discovery.go @@ -26,30 +26,30 @@ import ( // discovery bzz extension for requesting and relaying node address records -// discPeer wraps BzzPeer and embeds an Overlay connectivity driver -type discPeer struct { +// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver +type Peer struct { *BzzPeer - overlay Overlay - sentPeers bool // whether we already sent peer closer to this address - mtx sync.RWMutex + kad *Kademlia + sentPeers bool // whether we already sent peer closer to this address + mtx sync.RWMutex // peers map[string]bool // tracks node records sent to the peer depth uint8 // the proximity order advertised by remote as depth of saturation } -// NewDiscovery constructs a discovery peer -func newDiscovery(p *BzzPeer, o Overlay) *discPeer { - d := &discPeer{ - overlay: o, +// NewPeer constructs a discovery peer +func NewPeer(p *BzzPeer, kad *Kademlia) *Peer { + d := &Peer{ + kad: kad, BzzPeer: p, peers: make(map[string]bool), } // record remote as seen so we never send a peer its own record - d.seen(d) + d.seen(p.BzzAddr) return d } // HandleMsg is the message handler that delegates incoming messages -func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { +func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *peersMsg: @@ -64,24 +64,18 @@ func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { } // NotifyDepth sends a message to all connections if depth of saturation is changed -func NotifyDepth(depth uint8, h Overlay) { - f := func(val OverlayConn, po int, _ bool) bool { - dp, ok := val.(*discPeer) - if ok { - dp.NotifyDepth(depth) - } +func NotifyDepth(depth uint8, kad *Kademlia) { + f := func(val *Peer, po int, _ bool) bool { + val.NotifyDepth(depth) return true } - h.EachConn(nil, 255, f) + kad.EachConn(nil, 255, f) } // NotifyPeer informs all peers about a newly added node -func NotifyPeer(p OverlayAddr, k Overlay) { - f := func(val OverlayConn, po int, _ bool) bool { - dp, ok := val.(*discPeer) - if ok { - dp.NotifyPeer(p, uint8(po)) - } +func NotifyPeer(p *BzzAddr, k *Kademlia) { + f := func(val *Peer, po int, _ bool) bool { + val.NotifyPeer(p, uint8(po)) return true } k.EachConn(p.Address(), 255, f) @@ -91,22 +85,20 @@ func NotifyPeer(p OverlayAddr, k Overlay) { // the peer's PO is within the recipients advertised depth // OR the peer is closer to the recipient than self // unless already notified during the connection session -func (d *discPeer) NotifyPeer(a OverlayAddr, po uint8) { +func (d *Peer) NotifyPeer(a *BzzAddr, po uint8) { // immediately return if (po < d.getDepth() && pot.ProxCmp(d.localAddr, d, a) != 1) || d.seen(a) { return } - // log.Trace(fmt.Sprintf("%08x peer %08x notified of peer %08x", d.localAddr.Over()[:4], d.Address()[:4], a.Address()[:4])) resp := &peersMsg{ - Peers: []*BzzAddr{ToAddr(a)}, + Peers: []*BzzAddr{a}, } go d.Send(context.TODO(), resp) } // NotifyDepth sends a subPeers Msg to the receiver notifying them about // a change in the depth of saturation -func (d *discPeer) NotifyDepth(po uint8) { - // log.Trace(fmt.Sprintf("%08x peer %08x notified of new depth %v", d.localAddr.Over()[:4], d.Address()[:4], po)) +func (d *Peer) NotifyDepth(po uint8) { go d.Send(context.TODO(), &subPeersMsg{Depth: po}) } @@ -141,7 +133,7 @@ func (msg peersMsg) String() string { // handlePeersMsg called by the protocol when receiving peerset (for target address) // list of nodes ([]PeerAddr in peersMsg) is added to the overlay db using the // Register interface method -func (d *discPeer) handlePeersMsg(msg *peersMsg) error { +func (d *Peer) handlePeersMsg(msg *peersMsg) error { // register all addresses if len(msg.Peers) == 0 { return nil @@ -149,12 +141,12 @@ func (d *discPeer) handlePeersMsg(msg *peersMsg) error { for _, a := range msg.Peers { d.seen(a) - NotifyPeer(a, d.overlay) + NotifyPeer(a, d.kad) } - return d.overlay.Register(toOverlayAddrs(msg.Peers...)) + return d.kad.Register(msg.Peers...) } -// subPeers msg is communicating the depth/sharpness/focus of the overlay table of a peer +// subPeers msg is communicating the depth of the overlay table of a peer type subPeersMsg struct { Depth uint8 } @@ -164,21 +156,20 @@ func (msg subPeersMsg) String() string { return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth) } -func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { +func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error { if !d.sentPeers { d.setDepth(msg.Depth) var peers []*BzzAddr - d.overlay.EachConn(d.Over(), 255, func(p OverlayConn, po int, isproxbin bool) bool { + d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool { if pob, _ := pof(d, d.localAddr, 0); pob > po { return false } - if !d.seen(p) { - peers = append(peers, ToAddr(p.Off())) + if !d.seen(p.BzzAddr) { + peers = append(peers, p.BzzAddr) } return true }) if len(peers) > 0 { - // log.Debug(fmt.Sprintf("%08x: %v peers sent to %v", d.overlay.BaseAddr(), len(peers), d)) go d.Send(context.TODO(), &peersMsg{Peers: peers}) } } @@ -186,9 +177,9 @@ func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { return nil } -// seen takes an Overlay peer and checks if it was sent to a peer already +// seen takes an peer address and checks if it was sent to a peer already // if not, marks the peer as sent -func (d *discPeer) seen(p OverlayPeer) bool { +func (d *Peer) seen(p *BzzAddr) bool { d.mtx.Lock() defer d.mtx.Unlock() k := string(p.Address()) @@ -199,12 +190,13 @@ func (d *discPeer) seen(p OverlayPeer) bool { return false } -func (d *discPeer) getDepth() uint8 { +func (d *Peer) getDepth() uint8 { d.mtx.RLock() defer d.mtx.RUnlock() return d.depth } -func (d *discPeer) setDepth(depth uint8) { + +func (d *Peer) setDepth(depth uint8) { d.mtx.Lock() defer d.mtx.Unlock() d.depth = depth |