diff options
Diffstat (limited to 'swarm/network/kademlia.go')
-rw-r--r-- | swarm/network/kademlia.go | 220 |
1 files changed, 133 insertions, 87 deletions
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index da99287f1..f9b38fc48 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -168,85 +168,118 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { return nil } -// SuggestPeer returns a known peer for the lowest proximity bin for the -// lowest bincount below depth -// naturally if there is an empty row it returns a peer for that -func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) { +// SuggestPeer returns an unconnected peer address as a peer suggestion for connection +func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) { k.lock.Lock() defer k.lock.Unlock() - minsize := k.MinBinSize - depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) - // if there is a callable neighbour within the current proxBin, connect - // this makes sure nearest neighbour set is fully connected - var ppo int - k.addrs.EachNeighbour(k.base, Pof, func(val pot.Val, po int) bool { - if po < depth { - return false + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) + // collect undersaturated bins in ascending order of number of connected peers + // and from shallow to deep (ascending order of PO) + // insert them in a map of bin arrays, keyed with the number of connected peers + saturation := make(map[int][]int) + var lastPO int // the last non-empty PO bin in the iteration + saturationDepth = -1 // the deepest PO such that all shallower bins have >= k.MinBinSize peers + var pastDepth bool // whether po of iteration >= depth + k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool { + // process skipped empty bins + for ; lastPO < po; lastPO++ { + // find the lowest unsaturated bin + if saturationDepth == -1 { + saturationDepth = lastPO + } + // if there is an empty bin, depth is surely passed + pastDepth = true + saturation[0] = append(saturation[0], lastPO) } - e := val.(*entry) - c := k.callable(e) - if c { - a = e.BzzAddr + lastPO = po + 1 + // past radius, depth is surely passed + if po >= radius { + pastDepth = true } - ppo = po - return !c - }) - if a != nil { - log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo)) - return a, 0, false - } - - var bpo []int - prev := -1 - k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool { - prev++ - for ; prev < po; prev++ { - bpo = append(bpo, prev) - minsize = 0 + // beyond depth the bin is treated as unsaturated even if size >= k.MinBinSize + // in order to achieve full connectivity to all neighbours + if pastDepth && size >= k.MinBinSize { + size = k.MinBinSize - 1 } - if size < minsize { - bpo = append(bpo, po) - minsize = size + // process non-empty unsaturated bins + if size < k.MinBinSize { + // find the lowest unsaturated bin + if saturationDepth == -1 { + saturationDepth = po + } + saturation[size] = append(saturation[size], po) } - return size > 0 && po < depth + return true + }) + // to trigger peer requests for peers closer than closest connection, include + // all bins from nearest connection upto nearest address as unsaturated + var nearestAddrAt int + k.addrs.EachNeighbour(k.base, Pof, func(_ pot.Val, po int) bool { + nearestAddrAt = po + return false }) - // all buckets are full, ie., minsize == k.MinBinSize - if len(bpo) == 0 { + // including bins as size 0 has the effect that requesting connection + // is prioritised over non-empty shallower bins + for ; lastPO <= nearestAddrAt; lastPO++ { + saturation[0] = append(saturation[0], lastPO) + } + // all PO bins are saturated, ie., minsize >= k.MinBinSize, no peer suggested + if len(saturation) == 0 { return nil, 0, false } - // as long as we got candidate peers to connect to - // dont ask for new peers (want = false) - // try to select a candidate peer - // find the first callable peer - nxt := bpo[0] - k.addrs.EachBin(k.base, Pof, nxt, func(po, _ int, f func(func(pot.Val) bool) bool) bool { - // for each bin (up until depth) we find callable candidate peers - if po >= depth { - return false + // find the first callable peer in the address book + // starting from the bins with smallest size proceeding from shallow to deep + // for each bin (up until neighbourhood radius) we find callable candidate peers + for size := 0; size < k.MinBinSize && suggestedPeer == nil; size++ { + bins, ok := saturation[size] + if !ok { + // no bin with this size + continue } - return f(func(val pot.Val) bool { - e := val.(*entry) - c := k.callable(e) - if c { - a = e.BzzAddr + cur := 0 + curPO := bins[0] + k.addrs.EachBin(k.base, Pof, curPO, func(po, _ int, f func(func(pot.Val) bool) bool) bool { + curPO = bins[cur] + // find the next bin that has size size + if curPO == po { + cur++ + } else { + // skip bins that have no addresses + for ; cur < len(bins) && curPO < po; cur++ { + curPO = bins[cur] + } + if po < curPO { + cur-- + return true + } + // stop if there are no addresses + if curPO < po { + return false + } } - return !c + // curPO found + // find a callable peer out of the addresses in the unsaturated bin + // stop if found + f(func(val pot.Val) bool { + e := val.(*entry) + if k.callable(e) { + suggestedPeer = e.BzzAddr + return false + } + return true + }) + return cur < len(bins) && suggestedPeer == nil }) - }) - // found a candidate - if a != nil { - return a, 0, false } - // no candidate peer found, request for the short bin - var changed bool - if uint8(nxt) < k.depth { - k.depth = uint8(nxt) - changed = true + + if uint8(saturationDepth) < k.depth { + k.depth = uint8(saturationDepth) + return suggestedPeer, saturationDepth, true } - return a, nxt, changed + return suggestedPeer, 0, false } -// On inserts the peer as a kademlia peer into the live peers +// On inserts the peer as a kademlia peer into the live peers func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() @@ -398,29 +431,25 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { }) } +// NeighbourhoodDepth returns the depth for the pot, see depthForPot func (k *Kademlia) NeighbourhoodDepth() (depth int) { k.lock.RLock() defer k.lock.RUnlock() return depthForPot(k.conns, k.NeighbourhoodSize, k.base) } -// depthForPot returns the proximity order that defines the distance of -// the nearest neighbour set with cardinality >= NeighbourhoodSize -// if there is altogether less than NeighbourhoodSize peers it returns 0 +// neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia +// neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize +// i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether +// contain at least neighbourhoodSize connected peers +// if there is altogether less than neighbourhoodSize peers connected, it returns 0 // caller must hold the lock -func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) { +func neighbourhoodRadiusForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) { if p.Size() <= neighbourhoodSize { return 0 } - // total number of peers in iteration var size int - - // determining the depth is a two-step process - // first we find the proximity bin of the shallowest of the NeighbourhoodSize peers - // the numeric value of depth cannot be higher than this - var maxDepth int - f := func(v pot.Val, i int) bool { // po == 256 means that addr is the pivot address(self) if i == 256 { @@ -431,13 +460,30 @@ func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int // this means we have all nn-peers. // depth is by default set to the bin of the farthest nn-peer if size == neighbourhoodSize { - maxDepth = i + depth = i return false } return true } p.EachNeighbour(pivotAddr, Pof, f) + return depth +} + +// depthForPot returns the depth for the pot +// depth is the radius of the minimal extension of nearest neighbourhood that +// includes all empty PO bins. I.e., depth is the deepest PO such that +// - it is not deeper than neighbourhood radius +// - all bins shallower than depth are not empty +// caller must hold the lock +func depthForPot(p *pot.Pot, neighbourhoodSize int, pivotAddr []byte) (depth int) { + if p.Size() <= neighbourhoodSize { + return 0 + } + // determining the depth is a two-step process + // first we find the proximity bin of the shallowest of the neighbourhoodSize peers + // the numeric value of depth cannot be higher than this + maxDepth := neighbourhoodRadiusForPot(p, neighbourhoodSize, pivotAddr) // the second step is to test for empty bins in order from shallowest to deepest // if an empty bin is found, this will be the actual depth @@ -627,23 +673,20 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot { return ppmap } -// saturation iterates through all peers and -// returns the smallest po value in which the node has less than n peers -// if the iterator reaches depth, then value for depth is returned -// TODO move to separate testing tools file -// TODO this function will stop at the first bin with less than MinBinSize peers, even if there are empty bins between that bin and the depth. This may not be correct behavior +// saturation returns the smallest po value in which the node has less than MinBinSize peers +// if the iterator reaches neighbourhood radius, then the last bin + 1 is returned func (k *Kademlia) saturation() int { prev := -1 - k.addrs.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool { + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) + k.conns.EachBin(k.base, Pof, 0, func(po, size int, f func(func(val pot.Val) bool) bool) bool { prev++ + if po >= radius { + return false + } return prev == po && size >= k.MinBinSize }) - // TODO evaluate whether this check cannot just as well be done within the eachbin - depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) - - // if in the iterator above we iterated deeper than the neighbourhood depth - return depth - if depth < prev { - return depth + if prev < 0 { + return 0 } return prev } @@ -745,6 +788,9 @@ type Health struct { func (k *Kademlia) Healthy(pp *PeerPot) *Health { k.lock.RLock() defer k.lock.RUnlock() + if len(pp.NNSet) < k.NeighbourhoodSize { + log.Warn("peerpot NNSet < NeighbourhoodSize") + } gotnn, countgotnn, culpritsgotnn := k.connectedNeighbours(pp.NNSet) knownn, countknownn, culpritsknownn := k.knowNeighbours(pp.NNSet) depth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) |