diff options
author | Viktor Trón <viktor.tron@gmail.com> | 2018-09-12 17:24:56 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-09-12 17:24:56 +0800 |
commit | bfce00385f1c8dab222b7ddab6c336177a5ae731 (patch) | |
tree | 8b3d2b2ce30e8b5eaf6db5b89a6c5570c3997cff /swarm/network | |
parent | b06ff563a1f1095407612e04a1707e13d2dc20da (diff) | |
download | go-tangerine-bfce00385f1c8dab222b7ddab6c336177a5ae731.tar.gz go-tangerine-bfce00385f1c8dab222b7ddab6c336177a5ae731.tar.zst go-tangerine-bfce00385f1c8dab222b7ddab6c336177a5ae731.zip |
Kademlia refactor (#17641)
* swarm/network: simplify kademlia/hive; rid interfaces
* swarm, swarm/network/stream, swarm/netork/simulations,, swarm/pss: adapt to new Kad API
* swarm/network: minor changes re review; add missing lock to NeighbourhoodDepthC
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/discovery.go | 76 | ||||
-rw-r--r-- | swarm/network/discovery_test.go | 2 | ||||
-rw-r--r-- | swarm/network/hive.go | 71 | ||||
-rw-r--r-- | swarm/network/hive_test.go | 8 | ||||
-rw-r--r-- | swarm/network/kademlia.go | 154 | ||||
-rw-r--r-- | swarm/network/kademlia_test.go | 183 | ||||
-rw-r--r-- | swarm/network/networkid_test.go | 2 | ||||
-rw-r--r-- | swarm/network/protocol.go | 47 | ||||
-rw-r--r-- | swarm/network/simulations/discovery/discovery_test.go | 4 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 12 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 9 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 18 |
12 files changed, 225 insertions, 361 deletions
diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go index 55bf7c033..301959480 100644 --- a/swarm/network/discovery.go +++ b/swarm/network/discovery.go @@ -26,30 +26,30 @@ import ( // discovery bzz extension for requesting and relaying node address records -// discPeer wraps BzzPeer and embeds an Overlay connectivity driver -type discPeer struct { +// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver +type Peer struct { *BzzPeer - overlay Overlay - sentPeers bool // whether we already sent peer closer to this address - mtx sync.RWMutex + kad *Kademlia + sentPeers bool // whether we already sent peer closer to this address + mtx sync.RWMutex // peers map[string]bool // tracks node records sent to the peer depth uint8 // the proximity order advertised by remote as depth of saturation } -// NewDiscovery constructs a discovery peer -func newDiscovery(p *BzzPeer, o Overlay) *discPeer { - d := &discPeer{ - overlay: o, +// NewPeer constructs a discovery peer +func NewPeer(p *BzzPeer, kad *Kademlia) *Peer { + d := &Peer{ + kad: kad, BzzPeer: p, peers: make(map[string]bool), } // record remote as seen so we never send a peer its own record - d.seen(d) + d.seen(p.BzzAddr) return d } // HandleMsg is the message handler that delegates incoming messages -func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { +func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *peersMsg: @@ -64,24 +64,18 @@ func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { } // NotifyDepth sends a message to all connections if depth of saturation is changed -func NotifyDepth(depth uint8, h Overlay) { - f := func(val OverlayConn, po int, _ bool) bool { - dp, ok := val.(*discPeer) - if ok { - dp.NotifyDepth(depth) - } +func NotifyDepth(depth uint8, kad *Kademlia) { + f := func(val *Peer, po int, _ bool) bool { + val.NotifyDepth(depth) return true } - h.EachConn(nil, 255, f) + kad.EachConn(nil, 255, f) } // NotifyPeer informs all peers about a newly added node -func NotifyPeer(p OverlayAddr, k Overlay) { - f := func(val OverlayConn, po int, _ bool) bool { - dp, ok := val.(*discPeer) - if ok { - dp.NotifyPeer(p, uint8(po)) - } +func NotifyPeer(p *BzzAddr, k *Kademlia) { + f := func(val *Peer, po int, _ bool) bool { + val.NotifyPeer(p, uint8(po)) return true } k.EachConn(p.Address(), 255, f) @@ -91,22 +85,20 @@ func NotifyPeer(p OverlayAddr, k Overlay) { // the peer's PO is within the recipients advertised depth // OR the peer is closer to the recipient than self // unless already notified during the connection session -func (d *discPeer) NotifyPeer(a OverlayAddr, po uint8) { +func (d *Peer) NotifyPeer(a *BzzAddr, po uint8) { // immediately return if (po < d.getDepth() && pot.ProxCmp(d.localAddr, d, a) != 1) || d.seen(a) { return } - // log.Trace(fmt.Sprintf("%08x peer %08x notified of peer %08x", d.localAddr.Over()[:4], d.Address()[:4], a.Address()[:4])) resp := &peersMsg{ - Peers: []*BzzAddr{ToAddr(a)}, + Peers: []*BzzAddr{a}, } go d.Send(context.TODO(), resp) } // NotifyDepth sends a subPeers Msg to the receiver notifying them about // a change in the depth of saturation -func (d *discPeer) NotifyDepth(po uint8) { - // log.Trace(fmt.Sprintf("%08x peer %08x notified of new depth %v", d.localAddr.Over()[:4], d.Address()[:4], po)) +func (d *Peer) NotifyDepth(po uint8) { go d.Send(context.TODO(), &subPeersMsg{Depth: po}) } @@ -141,7 +133,7 @@ func (msg peersMsg) String() string { // handlePeersMsg called by the protocol when receiving peerset (for target address) // list of nodes ([]PeerAddr in peersMsg) is added to the overlay db using the // Register interface method -func (d *discPeer) handlePeersMsg(msg *peersMsg) error { +func (d *Peer) handlePeersMsg(msg *peersMsg) error { // register all addresses if len(msg.Peers) == 0 { return nil @@ -149,12 +141,12 @@ func (d *discPeer) handlePeersMsg(msg *peersMsg) error { for _, a := range msg.Peers { d.seen(a) - NotifyPeer(a, d.overlay) + NotifyPeer(a, d.kad) } - return d.overlay.Register(toOverlayAddrs(msg.Peers...)) + return d.kad.Register(msg.Peers...) } -// subPeers msg is communicating the depth/sharpness/focus of the overlay table of a peer +// subPeers msg is communicating the depth of the overlay table of a peer type subPeersMsg struct { Depth uint8 } @@ -164,21 +156,20 @@ func (msg subPeersMsg) String() string { return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth) } -func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { +func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error { if !d.sentPeers { d.setDepth(msg.Depth) var peers []*BzzAddr - d.overlay.EachConn(d.Over(), 255, func(p OverlayConn, po int, isproxbin bool) bool { + d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool { if pob, _ := pof(d, d.localAddr, 0); pob > po { return false } - if !d.seen(p) { - peers = append(peers, ToAddr(p.Off())) + if !d.seen(p.BzzAddr) { + peers = append(peers, p.BzzAddr) } return true }) if len(peers) > 0 { - // log.Debug(fmt.Sprintf("%08x: %v peers sent to %v", d.overlay.BaseAddr(), len(peers), d)) go d.Send(context.TODO(), &peersMsg{Peers: peers}) } } @@ -186,9 +177,9 @@ func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { return nil } -// seen takes an Overlay peer and checks if it was sent to a peer already +// seen takes an peer address and checks if it was sent to a peer already // if not, marks the peer as sent -func (d *discPeer) seen(p OverlayPeer) bool { +func (d *Peer) seen(p *BzzAddr) bool { d.mtx.Lock() defer d.mtx.Unlock() k := string(p.Address()) @@ -199,12 +190,13 @@ func (d *discPeer) seen(p OverlayPeer) bool { return false } -func (d *discPeer) getDepth() uint8 { +func (d *Peer) getDepth() uint8 { d.mtx.RLock() defer d.mtx.RUnlock() return d.depth } -func (d *discPeer) setDepth(depth uint8) { + +func (d *Peer) setDepth(depth uint8) { d.mtx.Lock() defer d.mtx.Unlock() d.depth = depth diff --git a/swarm/network/discovery_test.go b/swarm/network/discovery_test.go index 0427d81ca..494bc8196 100644 --- a/swarm/network/discovery_test.go +++ b/swarm/network/discovery_test.go @@ -33,7 +33,7 @@ func TestDiscovery(t *testing.T) { id := s.IDs[0] raddr := NewAddrFromNodeID(id) - pp.Register([]OverlayAddr{OverlayAddr(raddr)}) + pp.Register(raddr) // start the hive and wait for the connection pp.Start(s.Server) diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 366021088..425c1d5a1 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -32,31 +32,10 @@ import ( Hive is the logistic manager of the swarm When the hive is started, a forever loop is launched that -asks the Overlay Topology driver (e.g., generic kademlia nodetable) +asks the kademlia nodetable to suggest peers to bootstrap connectivity */ -// Overlay is the interface for kademlia (or other topology drivers) -type Overlay interface { - // suggest peers to connect to - SuggestPeer() (OverlayAddr, int, bool) - // register and deregister peer connections - On(OverlayConn) (depth uint8, changed bool) - Off(OverlayConn) - // register peer addresses - Register([]OverlayAddr) error - // iterate over connected peers - EachConn([]byte, int, func(OverlayConn, int, bool) bool) - // iterate over known peers (address records) - EachAddr([]byte, int, func(OverlayAddr, int, bool) bool) - // pretty print the connectivity - String() string - // base Overlay address of the node itself - BaseAddr() []byte - // connectivity health check used for testing - Healthy(*PeerPot) *Health -} - // HiveParams holds the config options to hive type HiveParams struct { Discovery bool // if want discovery of not @@ -78,7 +57,7 @@ func NewHiveParams() *HiveParams { // Hive manages network connections of the swarm node type Hive struct { *HiveParams // settings - Overlay // the overlay connectiviy driver + *Kademlia // the overlay connectiviy driver Store state.Store // storage interface to save peers across sessions addPeer func(*discover.Node) // server callback to connect to a peer // bookkeeping @@ -88,12 +67,12 @@ type Hive struct { // NewHive constructs a new hive // HiveParams: config parameters -// Overlay: connectivity driver using a network topology +// Kademlia: connectivity driver using a network topology // StateStore: to save peers across sessions -func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive { +func NewHive(params *HiveParams, kad *Kademlia, store state.Store) *Hive { return &Hive{ HiveParams: params, - Overlay: overlay, + Kademlia: kad, Store: store, } } @@ -133,7 +112,7 @@ func (h *Hive) Stop() error { } } log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) - h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool { + h.EachConn(nil, 255, func(p *Peer, _ int, _ bool) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) p.Drop(nil) return true @@ -151,14 +130,14 @@ func (h *Hive) connect() { addr, depth, changed := h.SuggestPeer() if h.Discovery && changed { - NotifyDepth(uint8(depth), h) + NotifyDepth(uint8(depth), h.Kademlia) } if addr == nil { continue } log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4])) - under, err := discover.ParseNode(string(addr.(Addr).Under())) + under, err := discover.ParseNode(string(addr.Under())) if err != nil { log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err)) continue @@ -170,19 +149,19 @@ func (h *Hive) connect() { // Run protocol run function func (h *Hive) Run(p *BzzPeer) error { - dp := newDiscovery(p, h) + dp := NewPeer(p, h.Kademlia) depth, changed := h.On(dp) // if we want discovery, advertise change of depth if h.Discovery { if changed { // if depth changed, send to all peers - NotifyDepth(depth, h) + NotifyDepth(depth, h.Kademlia) } else { // otherwise just send depth to new peer dp.NotifyDepth(depth) } } - NotifyPeer(p.Off(), h) + NotifyPeer(p.BzzAddr, h.Kademlia) defer h.Off(dp) return dp.Run(dp.HandleMsg) } @@ -206,17 +185,6 @@ func (h *Hive) PeerInfo(id discover.NodeID) interface{} { } } -// ToAddr returns the serialisable version of u -func ToAddr(pa OverlayPeer) *BzzAddr { - if addr, ok := pa.(*BzzAddr); ok { - return addr - } - if p, ok := pa.(*discPeer); ok { - return p.BzzAddr - } - return pa.(*BzzPeer).BzzAddr -} - // loadPeers, savePeer implement persistence callback/ func (h *Hive) loadPeers() error { var as []*BzzAddr @@ -230,28 +198,19 @@ func (h *Hive) loadPeers() error { } log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4])) - return h.Register(toOverlayAddrs(as...)) -} - -// toOverlayAddrs transforms an array of BzzAddr to OverlayAddr -func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) { - for _, a := range as { - oas = append(oas, OverlayAddr(a)) - } - return + return h.Register(as...) } // savePeers, savePeer implement persistence callback/ func (h *Hive) savePeers() error { var peers []*BzzAddr - h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool { + h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int, _ bool) bool { if pa == nil { log.Warn(fmt.Sprintf("empty addr: %v", i)) return true } - apa := ToAddr(pa) - log.Trace("saving peer", "peer", apa) - peers = append(peers, apa) + log.Trace("saving peer", "peer", pa) + peers = append(peers, pa) return true }) if err := h.Store.Put("peers", peers); err != nil { diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go index c2abfb2aa..7ea000c1a 100644 --- a/swarm/network/hive_test.go +++ b/swarm/network/hive_test.go @@ -41,7 +41,7 @@ func TestRegisterAndConnect(t *testing.T) { id := s.IDs[0] raddr := NewAddrFromNodeID(id) - pp.Register([]OverlayAddr{OverlayAddr(raddr)}) + pp.Register(raddr) // start the hive and wait for the connection err := pp.Start(s.Server) @@ -77,7 +77,7 @@ func TestHiveStatePersistance(t *testing.T) { peers := make(map[string]bool) for _, id := range s.IDs { raddr := NewAddrFromNodeID(id) - pp.Register([]OverlayAddr{OverlayAddr(raddr)}) + pp.Register(raddr) peers[raddr.String()] = true } @@ -97,8 +97,8 @@ func TestHiveStatePersistance(t *testing.T) { pp.Start(s1.Server) i := 0 - pp.Overlay.EachAddr(nil, 256, func(addr OverlayAddr, po int, nn bool) bool { - delete(peers, addr.(*BzzAddr).String()) + pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int, nn bool) bool { + delete(peers, addr.String()) i++ return true }) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 0177d449c..55a0c6f13 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -62,7 +62,7 @@ type KadParams struct { RetryExponent int // exponent to multiply retry intervals with MaxRetries int // maximum number of redial attempts // function to sanction or prevent suggesting a peer - Reachable func(OverlayAddr) bool + Reachable func(*BzzAddr) bool } // NewKadParams returns a params struct with default values @@ -106,45 +106,22 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia { } } -// OverlayPeer interface captures the common aspect of view of a peer from the Overlay -// topology driver -type OverlayPeer interface { - Address() []byte -} - -// OverlayConn represents a connected peer -type OverlayConn interface { - OverlayPeer - Drop(error) // call to indicate a peer should be expunged - Off() OverlayAddr // call to return a persitent OverlayAddr -} - -// OverlayAddr represents a kademlia peer record -type OverlayAddr interface { - OverlayPeer - Update(OverlayAddr) OverlayAddr // returns the updated version of the original -} - -// entry represents a Kademlia table entry (an extension of OverlayPeer) +// entry represents a Kademlia table entry (an extension of BzzAddr) type entry struct { - OverlayPeer + *BzzAddr + conn *Peer seenAt time.Time retries int } -// newEntry creates a kademlia peer from an OverlayPeer interface -func newEntry(p OverlayPeer) *entry { +// newEntry creates a kademlia peer from a *Peer +func newEntry(p *BzzAddr) *entry { return &entry{ - OverlayPeer: p, - seenAt: time.Now(), + BzzAddr: p, + seenAt: time.Now(), } } -// Bin is the binary (bitvector) serialisation of the entry address -func (e *entry) Bin() string { - return pot.ToBin(e.addr().Address()) -} - // Label is a short tag for the entry for debug func Label(e *entry) string { return fmt.Sprintf("%s (%d)", e.Hex()[:4], e.retries) @@ -152,29 +129,12 @@ func Label(e *entry) string { // Hex is the hexadecimal serialisation of the entry address func (e *entry) Hex() string { - return fmt.Sprintf("%x", e.addr().Address()) + return fmt.Sprintf("%x", e.Address()) } -// String is the short tag for the entry -func (e *entry) String() string { - return fmt.Sprintf("%s (%d)", e.Hex()[:8], e.retries) -} - -// addr returns the kad peer record (OverlayAddr) corresponding to the entry -func (e *entry) addr() OverlayAddr { - a, _ := e.OverlayPeer.(OverlayAddr) - return a -} - -// conn returns the connected peer (OverlayPeer) corresponding to the entry -func (e *entry) conn() OverlayConn { - c, _ := e.OverlayPeer.(OverlayConn) - return c -} - -// Register enters each OverlayAddr as kademlia peer record into the +// Register enters each address as kademlia peer record into the // database of known peer addresses -func (k *Kademlia) Register(peers []OverlayAddr) error { +func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() var known, size int @@ -203,7 +163,6 @@ func (k *Kademlia) Register(peers []OverlayAddr) error { if k.addrCountC != nil && size-known > 0 { k.addrCountC <- k.addrs.Size() } - // log.Trace(fmt.Sprintf("%x registered %v peers, %v known, total: %v", k.BaseAddr()[:4], size, known, k.addrs.Size())) k.sendNeighbourhoodDepthChange() return nil @@ -212,7 +171,7 @@ func (k *Kademlia) Register(peers []OverlayAddr) error { // SuggestPeer returns a known peer for the lowest proximity bin for the // lowest bincount below depth // naturally if there is an empty row it returns a peer for that -func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { +func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) { k.lock.Lock() defer k.lock.Unlock() minsize := k.MinBinSize @@ -224,15 +183,18 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { if po < depth { return false } - a = k.callable(val) + e := val.(*entry) + c := k.callable(e) + if c { + a = e.BzzAddr + } ppo = po - return a == nil + return !c }) if a != nil { log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo)) return a, 0, false } - // log.Trace(fmt.Sprintf("%08x no candidate nearest neighbours to connect to (Depth: %v, minProxSize: %v) %#v", k.BaseAddr()[:4], depth, k.MinProxBinSize, a)) var bpo []int prev := -1 @@ -250,7 +212,6 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { }) // all buckets are full, ie., minsize == k.MinBinSize if len(bpo) == 0 { - // log.Debug(fmt.Sprintf("%08x: all bins saturated", k.BaseAddr()[:4])) return nil, 0, false } // as long as we got candidate peers to connect to @@ -264,8 +225,12 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { return false } return f(func(val pot.Val, _ int) bool { - a = k.callable(val) - return a == nil + e := val.(*entry) + c := k.callable(e) + if c { + a = e.BzzAddr + } + return !c }) }) // found a candidate @@ -282,25 +247,26 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { } // On inserts the peer as a kademlia peer into the live peers -func (k *Kademlia) On(p OverlayConn) (uint8, bool) { +func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() - e := newEntry(p) var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(v pot.Val) pot.Val { // if not found live if v == nil { ins = true // insert new online peer into conns - return e + return p } // found among live peers, do nothing return v }) if ins { + a := newEntry(p.BzzAddr) + a.conn = p // insert new online peer into addrs k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { - return e + return a }) // send new address count value only if the peer is inserted if k.addrCountC != nil { @@ -324,6 +290,8 @@ func (k *Kademlia) On(p OverlayConn) (uint8, bool) { // Not receiving from the returned channel will block On function // when the neighbourhood depth is changed. func (k *Kademlia) NeighbourhoodDepthC() <-chan int { + k.lock.Lock() + defer k.lock.Unlock() if k.nDepthC == nil { k.nDepthC = make(chan int) } @@ -357,7 +325,7 @@ func (k *Kademlia) AddrCountC() <-chan int { } // Off removes a peer from among live peers -func (k *Kademlia) Off(p OverlayConn) { +func (k *Kademlia) Off(p *Peer) { k.lock.Lock() defer k.lock.Unlock() var del bool @@ -367,7 +335,7 @@ func (k *Kademlia) Off(p OverlayConn) { panic(fmt.Sprintf("connected peer not found %v", p)) } del = true - return newEntry(p.Off()) + return newEntry(p.BzzAddr) }) if del { @@ -383,7 +351,7 @@ func (k *Kademlia) Off(p OverlayConn) { } } -func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn OverlayConn, po int) bool) { +func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn *Peer, po int) bool) { k.lock.RLock() defer k.lock.RUnlock() @@ -403,7 +371,7 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con for bin := startPo; bin <= endPo; bin++ { f(func(val pot.Val, _ int) bool { - return eachBinFunc(val.(*entry).conn(), bin) + return eachBinFunc(val.(*Peer), bin) }) } return true @@ -413,13 +381,13 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con // EachConn is an iterator with args (base, po, f) applies f to each live peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -func (k *Kademlia) EachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) { +func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachConn(base, o, f) } -func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) { +func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) { if len(base) == 0 { base = k.base } @@ -428,20 +396,20 @@ func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) b if po > o { return true } - return f(val.(*entry).conn(), po, po >= depth) + return f(val.(*Peer), po, po >= depth) }) } // EachAddr called with (base, po, f) is an iterator applying f to each known peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -func (k *Kademlia) EachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) { +func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachAddr(base, o, f) } -func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) { +func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { if len(base) == 0 { base = k.base } @@ -450,7 +418,7 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) b if po > o { return true } - return f(val.(*entry).addr(), po, po >= depth) + return f(val.(*entry).BzzAddr, po, po >= depth) }) } @@ -472,12 +440,11 @@ func (k *Kademlia) neighbourhoodDepth() (depth int) { return depth } -// callable when called with val, -func (k *Kademlia) callable(val pot.Val) OverlayAddr { - e := val.(*entry) +// callable decides if an address entry represents a callable peer +func (k *Kademlia) callable(e *entry) bool { // not callable if peer is live or exceeded maxRetries - if e.conn() != nil || e.retries > k.MaxRetries { - return nil + if e.conn != nil || e.retries > k.MaxRetries { + return false } // calculate the allowed number of retries based on time lapsed since last seen timeAgo := int64(time.Since(e.seenAt)) @@ -491,17 +458,17 @@ func (k *Kademlia) callable(val pot.Val) OverlayAddr { // peer can be retried again if retries < e.retries { log.Trace(fmt.Sprintf("%08x: %v long time since last try (at %v) needed before retry %v, wait only warrants %v", k.BaseAddr()[:4], e, timeAgo, e.retries, retries)) - return nil + return false } // function to sanction or prevent suggesting a peer - if k.Reachable != nil && !k.Reachable(e.addr()) { + if k.Reachable != nil && !k.Reachable(e.BzzAddr) { log.Trace(fmt.Sprintf("%08x: peer %v is temporarily not callable", k.BaseAddr()[:4], e)) - return nil + return false } e.retries++ log.Trace(fmt.Sprintf("%08x: peer %v is callable", k.BaseAddr()[:4], e)) - return e.addr() + return true } // BaseAddr return the kademlia base address @@ -516,7 +483,8 @@ func (k *Kademlia) String() string { return k.string() } -// String returns kademlia table + kaddb table displayed with ascii +// string returns kademlia table + kaddb table displayed with ascii +// caller must hold the lock func (k *Kademlia) string() string { wsrow := " " var rows []string @@ -538,7 +506,7 @@ func (k *Kademlia) string() string { row := []string{fmt.Sprintf("%2d", size)} rest -= size f(func(val pot.Val, vpo int) bool { - e := val.(*entry) + e := val.(*Peer) row = append(row, fmt.Sprintf("%x", e.Address()[:2])) rowlen++ return rowlen < 4 @@ -594,8 +562,9 @@ type PeerPot struct { EmptyBins []int } -// NewPeerPotMap creates a map of pot record of OverlayAddr with keys +// NewPeerPotMap creates a map of pot record of *BzzAddr with keys // as hexadecimal representations of the address. +// used for testing only func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { // create a table of all nodes for health check np := pot.NewPot(nil, 0) @@ -640,6 +609,7 @@ func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { // saturation returns the lowest proximity order that the bin for that order // has less than n peers +// It is used in Healthy function for testing only func (k *Kademlia) saturation(n int) int { prev := -1 k.addrs.EachBin(k.base, pof, 0, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool { @@ -654,7 +624,7 @@ func (k *Kademlia) saturation(n int) int { } // full returns true if all required bins have connected peers. -// It is used in Healthy function. +// It is used in Healthy function for testing only func (k *Kademlia) full(emptyBins []int) (full bool) { prev := 0 e := len(emptyBins) @@ -688,10 +658,13 @@ func (k *Kademlia) full(emptyBins []int) (full bool) { return e == 0 } +// knowNearestNeighbours tests if all known nearest neighbours given as arguments +// are found in the addressbook +// It is used in Healthy function for testing only func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool { pm := make(map[string]bool) - k.eachAddr(nil, 255, func(p OverlayAddr, po int, nn bool) bool { + k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool { if !nn { return false } @@ -709,10 +682,13 @@ func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool { return true } +// gotNearestNeighbours tests if all known nearest neighbours given as arguments +// are connected peers +// It is used in Healthy function for testing only func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) { pm := make(map[string]bool) - k.eachConn(nil, 255, func(p OverlayConn, po int, nn bool) bool { + k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool { if !nn { return false } @@ -735,6 +711,7 @@ func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missin } // Health state of the Kademlia +// used for testing only type Health struct { KnowNN bool // whether node knows all its nearest neighbours GotNN bool // whether node is connected to all its nearest neighbours @@ -746,6 +723,7 @@ type Health struct { // Healthy reports the health state of the kademlia connectivity // returns a Health struct +// used for testing only func (k *Kademlia) Healthy(pp *PeerPot) *Health { k.lock.RLock() defer k.lock.RUnlock() diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index b60e1e9a3..903c8dbda 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -38,71 +38,42 @@ func testKadPeerAddr(s string) *BzzAddr { return &BzzAddr{OAddr: a, UAddr: a} } -type testDropPeer struct { - Peer - dropc chan error -} - -type dropError struct { - error - addr string -} - -func (d *testDropPeer) Drop(err error) { - err2 := &dropError{err, binStr(d)} - d.dropc <- err2 -} - -type testKademlia struct { - *Kademlia - Discovery bool - dropc chan error -} - -func newTestKademlia(b string) *testKademlia { +func newTestKademlia(b string) *Kademlia { params := NewKadParams() params.MinBinSize = 1 params.MinProxBinSize = 2 base := pot.NewAddressFromString(b) - return &testKademlia{ - NewKademlia(base, params), - false, - make(chan error), - } + return NewKademlia(base, params) } -func (k *testKademlia) newTestKadPeer(s string) Peer { - return &testDropPeer{&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k.dropc} +func newTestKadPeer(k *Kademlia, s string) *Peer { + return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k) } -func (k *testKademlia) On(ons ...string) *testKademlia { +func On(k *Kademlia, ons ...string) { for _, s := range ons { - k.Kademlia.On(k.newTestKadPeer(s).(OverlayConn)) + k.On(newTestKadPeer(k, s)) } - return k } -func (k *testKademlia) Off(offs ...string) *testKademlia { +func Off(k *Kademlia, offs ...string) { for _, s := range offs { - k.Kademlia.Off(k.newTestKadPeer(s).(OverlayConn)) + k.Off(newTestKadPeer(k, s)) } - - return k } -func (k *testKademlia) Register(regs ...string) *testKademlia { - var as []OverlayAddr +func Register(k *Kademlia, regs ...string) { + var as []*BzzAddr for _, s := range regs { as = append(as, testKadPeerAddr(s)) } - err := k.Kademlia.Register(as) + err := k.Register(as...) if err != nil { panic(err.Error()) } - return k } -func testSuggestPeer(t *testing.T, k *testKademlia, expAddr string, expPo int, expWant bool) error { +func testSuggestPeer(k *Kademlia, expAddr string, expPo int, expWant bool) error { addr, o, want := k.SuggestPeer() if binStr(addr) != expAddr { return fmt.Errorf("incorrect peer address suggested. expected %v, got %v", expAddr, binStr(addr)) @@ -116,7 +87,7 @@ func testSuggestPeer(t *testing.T, k *testKademlia, expAddr string, expPo int, e return nil } -func binStr(a OverlayPeer) string { +func binStr(a *BzzAddr) string { if a == nil { return "<nil>" } @@ -125,15 +96,17 @@ func binStr(a OverlayPeer) string { func TestSuggestPeerBug(t *testing.T) { // 2 row gap, unsaturated proxbin, no callables -> want PO 0 - k := newTestKademlia("00000000").On( + k := newTestKademlia("00000000") + On(k, "10000000", "11000000", "01000000", "00010000", "00011000", - ).Off( + ) + Off(k, "01000000", ) - err := testSuggestPeer(t, k, "01000000", 0, false) + err := testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } @@ -141,140 +114,140 @@ func TestSuggestPeerBug(t *testing.T) { func TestSuggestPeerFindPeers(t *testing.T) { // 2 row gap, unsaturated proxbin, no callables -> want PO 0 - k := newTestKademlia("00000000").On("00100000") - err := testSuggestPeer(t, k, "<nil>", 0, false) + k := newTestKademlia("00000000") + On(k, "00100000") + err := testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // 2 row gap, saturated proxbin, no callables -> want PO 0 - k.On("00010000") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "00010000") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // 1 row gap (1 less), saturated proxbin, no callables -> want PO 1 - k.On("10000000") - err = testSuggestPeer(t, k, "<nil>", 1, false) + On(k, "10000000") + err = testSuggestPeer(k, "<nil>", 1, false) if err != nil { t.Fatal(err.Error()) } // no gap (1 less), saturated proxbin, no callables -> do not want more - k.On("01000000", "00100001") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "01000000", "00100001") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // oversaturated proxbin, > do not want more - k.On("00100001") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "00100001") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // reintroduce gap, disconnected peer callable - // log.Info(k.String()) - k.Off("01000000") - err = testSuggestPeer(t, k, "01000000", 0, false) + Off(k, "01000000") + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } // second time disconnected peer not callable // with reasonably set Interval - err = testSuggestPeer(t, k, "<nil>", 1, true) + err = testSuggestPeer(k, "<nil>", 1, true) if err != nil { t.Fatal(err.Error()) } // on and off again, peer callable again - k.On("01000000") - k.Off("01000000") - err = testSuggestPeer(t, k, "01000000", 0, false) + On(k, "01000000") + Off(k, "01000000") + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("01000000") + On(k, "01000000") // new closer peer appears, it is immediately wanted - k.Register("00010001") - err = testSuggestPeer(t, k, "00010001", 0, false) + Register(k, "00010001") + err = testSuggestPeer(k, "00010001", 0, false) if err != nil { t.Fatal(err.Error()) } // PO1 disconnects - k.On("00010001") + On(k, "00010001") log.Info(k.String()) - k.Off("01000000") + Off(k, "01000000") log.Info(k.String()) // second time, gap filling - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("01000000") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "01000000") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } k.MinBinSize = 2 - err = testSuggestPeer(t, k, "<nil>", 0, true) + err = testSuggestPeer(k, "<nil>", 0, true) if err != nil { t.Fatal(err.Error()) } - k.Register("01000001") - err = testSuggestPeer(t, k, "01000001", 0, false) + Register(k, "01000001") + err = testSuggestPeer(k, "01000001", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("10000001") + On(k, "10000001") log.Trace(fmt.Sprintf("Kad:\n%v", k.String())) - err = testSuggestPeer(t, k, "<nil>", 1, true) + err = testSuggestPeer(k, "<nil>", 1, true) if err != nil { t.Fatal(err.Error()) } - k.On("01000001") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "01000001") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } k.MinBinSize = 3 - k.Register("10000010") - err = testSuggestPeer(t, k, "10000010", 0, false) + Register(k, "10000010") + err = testSuggestPeer(k, "10000010", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("10000010") - err = testSuggestPeer(t, k, "<nil>", 1, false) + On(k, "10000010") + err = testSuggestPeer(k, "<nil>", 1, false) if err != nil { t.Fatal(err.Error()) } - k.On("01000010") - err = testSuggestPeer(t, k, "<nil>", 2, false) + On(k, "01000010") + err = testSuggestPeer(k, "<nil>", 2, false) if err != nil { t.Fatal(err.Error()) } - k.On("00100010") - err = testSuggestPeer(t, k, "<nil>", 3, false) + On(k, "00100010") + err = testSuggestPeer(k, "<nil>", 3, false) if err != nil { t.Fatal(err.Error()) } - k.On("00010010") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "00010010") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } @@ -282,10 +255,8 @@ func TestSuggestPeerFindPeers(t *testing.T) { } func TestSuggestPeerRetries(t *testing.T) { - t.Skip("Test is disabled, because it is flaky. It fails with kademlia_test.go:346: incorrect peer address suggested. expected <nil>, got 01000000") - // 2 row gap, unsaturated proxbin, no callables -> want PO 0 k := newTestKademlia("00000000") - k.RetryInterval = int64(100 * time.Millisecond) // cycle + k.RetryInterval = int64(300 * time.Millisecond) // cycle k.MaxRetries = 50 k.RetryExponent = 2 sleep := func(n int) { @@ -296,53 +267,53 @@ func TestSuggestPeerRetries(t *testing.T) { time.Sleep(time.Duration(ts)) } - k.Register("01000000") - k.On("00000001", "00000010") - err := testSuggestPeer(t, k, "01000000", 0, false) + Register(k, "01000000") + On(k, "00000001", "00000010") + err := testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(1) - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(1) - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(2) - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(2) - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } @@ -350,7 +321,9 @@ func TestSuggestPeerRetries(t *testing.T) { } func TestKademliaHiveString(t *testing.T) { - k := newTestKademlia("00000000").On("01000000", "00100000").Register("10000000", "10000001") + k := newTestKademlia("00000000") + On(k, "01000000", "00100000") + Register(k, "10000000", "10000001") k.MaxProxDisplay = 8 h := k.String() expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), MinProxBinSize: 2, MinBinSize: 1, MaxBinSize: 4\n000 0 | 2 8100 (0) 8000 (0)\n============ DEPTH: 1 ==========================================\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" @@ -378,7 +351,7 @@ func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) { continue } p := &BzzAddr{OAddr: a, UAddr: a} - if err := k.Register([]OverlayAddr{p}); err != nil { + if err := k.Register(p); err != nil { t.Fatal(err) } } @@ -392,12 +365,12 @@ func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) { if a == nil { break } - k.On(&BzzPeer{BzzAddr: a.(*BzzAddr)}) + k.On(NewPeer(&BzzPeer{BzzAddr: a}, k)) } h := k.Healthy(pp) if !(h.GotNN && h.KnowNN && h.Full) { - t.Error("not healthy") + t.Fatalf("not healthy: %#v\n%v", h, k.String()) } } diff --git a/swarm/network/networkid_test.go b/swarm/network/networkid_test.go index 05134b083..91a1f6d7b 100644 --- a/swarm/network/networkid_test.go +++ b/swarm/network/networkid_test.go @@ -92,7 +92,7 @@ func TestNetworkID(t *testing.T) { if kademlias[node].addrs.Size() != len(netIDGroup)-1 { t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1) } - kademlias[node].EachAddr(nil, 0, func(addr OverlayAddr, _ int, _ bool) bool { + kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int, _ bool) bool { found := false for _, nd := range netIDGroup { p := ToOverlayAddr(nd.Bytes()) diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 7f7ca5eed..ef0956d5f 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -62,32 +62,6 @@ var DiscoverySpec = &protocols.Spec{ }, } -// Addr interface that peerPool needs -type Addr interface { - OverlayPeer - Over() []byte - Under() []byte - String() string - Update(OverlayAddr) OverlayAddr -} - -// Peer interface represents an live peer connection -type Peer interface { - Addr // the address of a peer - Conn // the live connection (protocols.Peer) - LastActive() time.Time // last time active -} - -// Conn interface represents an live peer connection -type Conn interface { - ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool - Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages - Send(context.Context, interface{}) error // can send messages - Drop(error) // disconnect this peer - Run(func(context.Context, interface{}) error) error // the run function to run a protocol - Off() OverlayAddr -} - // BzzConfig captures the config params used by the hive type BzzConfig struct { OverlayAddr []byte // base address of the overlay network @@ -114,7 +88,7 @@ type Bzz struct { // * bzz config // * overlay driver // * peer store -func NewBzz(config *BzzConfig, kad Overlay, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz { +func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz { return &Bzz{ Hive: NewHive(config.HiveParams, kad, store), NetworkID: config.NetworkID, @@ -131,7 +105,7 @@ func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr { b.localAddr = b.localAddr.Update(&BzzAddr{ UAddr: byteaddr, OAddr: b.localAddr.OAddr, - }).(*BzzAddr) + }) return b.localAddr } @@ -274,7 +248,7 @@ type BzzPeer struct { LightNode bool } -func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { +func NewBzzPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { return &BzzPeer{ Peer: p, localAddr: addr, @@ -282,11 +256,6 @@ func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { } } -// Off returns the overlay peer record for offline persistence -func (p *BzzPeer) Off() OverlayAddr { - return p.BzzAddr -} - // LastActive returns the time the peer was last active func (p *BzzPeer) LastActive() time.Time { return p.lastActive @@ -388,8 +357,8 @@ func (a *BzzAddr) ID() discover.NodeID { } // Update updates the underlay address of a peer record -func (a *BzzAddr) Update(na OverlayAddr) OverlayAddr { - return &BzzAddr{a.OAddr, na.(Addr).Under()} +func (a *BzzAddr) Update(na *BzzAddr) *BzzAddr { + return &BzzAddr{a.OAddr, na.UAddr} } // String pretty prints the address @@ -410,9 +379,9 @@ func RandomAddr() *BzzAddr { } // NewNodeIDFromAddr transforms the underlay address to an adapters.NodeID -func NewNodeIDFromAddr(addr Addr) discover.NodeID { - log.Info(fmt.Sprintf("uaddr=%s", string(addr.Under()))) - node := discover.MustParseNode(string(addr.Under())) +func NewNodeIDFromAddr(addr *BzzAddr) discover.NodeID { + log.Info(fmt.Sprintf("uaddr=%s", string(addr.UAddr))) + node := discover.MustParseNode(string(addr.UAddr)) return node.ID } diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go index acf3479e5..913d6d837 100644 --- a/swarm/network/simulations/discovery/discovery_test.go +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -556,8 +556,8 @@ func newService(ctx *adapters.ServiceContext) (node.Service, error) { kp.MinProxBinSize = testMinProxBinSize if ctx.Config.Reachable != nil { - kp.Reachable = func(o network.OverlayAddr) bool { - return ctx.Config.Reachable(o.(*network.BzzAddr).ID()) + kp.Reachable = func(o *network.BzzAddr) bool { + return ctx.Config.Reachable(o.ID()) } } kad := network.NewKademlia(addr.Over(), kp) diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 36040339d..627352535 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -47,15 +47,15 @@ var ( type Delivery struct { db *storage.DBAPI - overlay network.Overlay + kad *network.Kademlia receiveC chan *ChunkDeliveryMsg getPeer func(discover.NodeID) *Peer } -func NewDelivery(overlay network.Overlay, db *storage.DBAPI) *Delivery { +func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery { d := &Delivery{ db: db, - overlay: overlay, + kad: kad, receiveC: make(chan *ChunkDeliveryMsg, deliveryCap), } @@ -172,7 +172,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * t := time.NewTimer(10 * time.Minute) defer t.Stop() - log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.overlay.BaseAddr()), "created", created) + log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created) start := time.Now() select { case <-chunk.ReqC: @@ -269,8 +269,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck var err error requestFromPeersCount.Inc(1) - d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool { - spId := p.(network.Peer).ID() + d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool { + spId := p.ID() for _, p := range peersToSkip { if p == spId { log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 4e1ab09fc..313019d6a 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -457,15 +457,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { //returns the number of subscriptions requested func startSyncing(r *Registry, conf *synctestConfig) (int, error) { var err error - - kad, ok := r.delivery.overlay.(*network.Kademlia) - if !ok { - return 0, fmt.Errorf("Not a Kademlia!") - } - + kad := r.delivery.kad subCnt := 0 //iterate over each bin and solicit needed subscription to bins - kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool { + kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to histRange := &Range{} diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index cd0580a0c..deffdfc3f 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -130,7 +130,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i // wait for kademlia table to be healthy time.Sleep(options.SyncUpdateDelay) - kad := streamer.delivery.overlay.(*network.Kademlia) + kad := streamer.delivery.kad depthC := latestIntC(kad.NeighbourhoodDepthC()) addressBookSizeC := latestIntC(kad.AddrCountC()) @@ -398,9 +398,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { // and they are no longer required after iteration, request to Quit // them will be send to appropriate peers. func (r *Registry) updateSyncing() { - // if overlay in not Kademlia, panic - kad := r.delivery.overlay.(*network.Kademlia) - + kad := r.delivery.kad // map of all SYNC streams for all peers // used at the and of the function to remove servers // that are not needed anymore @@ -421,8 +419,7 @@ func (r *Registry) updateSyncing() { r.peersMu.RUnlock() // request subscriptions for all nodes and bins - kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool { - p := conn.(network.Peer) + kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool { log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin)) // bin is always less then 256 and it is safe to convert it to type uint8 @@ -461,10 +458,11 @@ func (r *Registry) updateSyncing() { func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := protocols.NewPeer(p, rw, Spec) - bzzPeer := network.NewBzzTestPeer(peer, r.addr) - r.delivery.overlay.On(bzzPeer) - defer r.delivery.overlay.Off(bzzPeer) - return r.Run(bzzPeer) + bp := network.NewBzzPeer(peer, r.addr) + np := network.NewPeer(bp, r.delivery.kad) + r.delivery.kad.On(np) + defer r.delivery.kad.Off(np) + return r.Run(bp) } // HandleMsg is the message handler that delegates incoming messages |