diff options
Diffstat (limited to 'swarm/network/kademlia.go')
-rw-r--r-- | swarm/network/kademlia.go | 154 |
1 files changed, 66 insertions, 88 deletions
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 0177d449c..55a0c6f13 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -62,7 +62,7 @@ type KadParams struct { RetryExponent int // exponent to multiply retry intervals with MaxRetries int // maximum number of redial attempts // function to sanction or prevent suggesting a peer - Reachable func(OverlayAddr) bool + Reachable func(*BzzAddr) bool } // NewKadParams returns a params struct with default values @@ -106,45 +106,22 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia { } } -// OverlayPeer interface captures the common aspect of view of a peer from the Overlay -// topology driver -type OverlayPeer interface { - Address() []byte -} - -// OverlayConn represents a connected peer -type OverlayConn interface { - OverlayPeer - Drop(error) // call to indicate a peer should be expunged - Off() OverlayAddr // call to return a persitent OverlayAddr -} - -// OverlayAddr represents a kademlia peer record -type OverlayAddr interface { - OverlayPeer - Update(OverlayAddr) OverlayAddr // returns the updated version of the original -} - -// entry represents a Kademlia table entry (an extension of OverlayPeer) +// entry represents a Kademlia table entry (an extension of BzzAddr) type entry struct { - OverlayPeer + *BzzAddr + conn *Peer seenAt time.Time retries int } -// newEntry creates a kademlia peer from an OverlayPeer interface -func newEntry(p OverlayPeer) *entry { +// newEntry creates a kademlia peer from a *Peer +func newEntry(p *BzzAddr) *entry { return &entry{ - OverlayPeer: p, - seenAt: time.Now(), + BzzAddr: p, + seenAt: time.Now(), } } -// Bin is the binary (bitvector) serialisation of the entry address -func (e *entry) Bin() string { - return pot.ToBin(e.addr().Address()) -} - // Label is a short tag for the entry for debug func Label(e *entry) string { return fmt.Sprintf("%s (%d)", e.Hex()[:4], e.retries) @@ -152,29 +129,12 @@ func Label(e *entry) string { // Hex is the hexadecimal serialisation of the entry address func (e *entry) Hex() string { - return fmt.Sprintf("%x", e.addr().Address()) + return fmt.Sprintf("%x", e.Address()) } -// String is the short tag for the entry -func (e *entry) String() string { - return fmt.Sprintf("%s (%d)", e.Hex()[:8], e.retries) -} - -// addr returns the kad peer record (OverlayAddr) corresponding to the entry -func (e *entry) addr() OverlayAddr { - a, _ := e.OverlayPeer.(OverlayAddr) - return a -} - -// conn returns the connected peer (OverlayPeer) corresponding to the entry -func (e *entry) conn() OverlayConn { - c, _ := e.OverlayPeer.(OverlayConn) - return c -} - -// Register enters each OverlayAddr as kademlia peer record into the +// Register enters each address as kademlia peer record into the // database of known peer addresses -func (k *Kademlia) Register(peers []OverlayAddr) error { +func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() var known, size int @@ -203,7 +163,6 @@ func (k *Kademlia) Register(peers []OverlayAddr) error { if k.addrCountC != nil && size-known > 0 { k.addrCountC <- k.addrs.Size() } - // log.Trace(fmt.Sprintf("%x registered %v peers, %v known, total: %v", k.BaseAddr()[:4], size, known, k.addrs.Size())) k.sendNeighbourhoodDepthChange() return nil @@ -212,7 +171,7 @@ func (k *Kademlia) Register(peers []OverlayAddr) error { // SuggestPeer returns a known peer for the lowest proximity bin for the // lowest bincount below depth // naturally if there is an empty row it returns a peer for that -func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { +func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) { k.lock.Lock() defer k.lock.Unlock() minsize := k.MinBinSize @@ -224,15 +183,18 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { if po < depth { return false } - a = k.callable(val) + e := val.(*entry) + c := k.callable(e) + if c { + a = e.BzzAddr + } ppo = po - return a == nil + return !c }) if a != nil { log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo)) return a, 0, false } - // log.Trace(fmt.Sprintf("%08x no candidate nearest neighbours to connect to (Depth: %v, minProxSize: %v) %#v", k.BaseAddr()[:4], depth, k.MinProxBinSize, a)) var bpo []int prev := -1 @@ -250,7 +212,6 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { }) // all buckets are full, ie., minsize == k.MinBinSize if len(bpo) == 0 { - // log.Debug(fmt.Sprintf("%08x: all bins saturated", k.BaseAddr()[:4])) return nil, 0, false } // as long as we got candidate peers to connect to @@ -264,8 +225,12 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { return false } return f(func(val pot.Val, _ int) bool { - a = k.callable(val) - return a == nil + e := val.(*entry) + c := k.callable(e) + if c { + a = e.BzzAddr + } + return !c }) }) // found a candidate @@ -282,25 +247,26 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { } // On inserts the peer as a kademlia peer into the live peers -func (k *Kademlia) On(p OverlayConn) (uint8, bool) { +func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() - e := newEntry(p) var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(v pot.Val) pot.Val { // if not found live if v == nil { ins = true // insert new online peer into conns - return e + return p } // found among live peers, do nothing return v }) if ins { + a := newEntry(p.BzzAddr) + a.conn = p // insert new online peer into addrs k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { - return e + return a }) // send new address count value only if the peer is inserted if k.addrCountC != nil { @@ -324,6 +290,8 @@ func (k *Kademlia) On(p OverlayConn) (uint8, bool) { // Not receiving from the returned channel will block On function // when the neighbourhood depth is changed. func (k *Kademlia) NeighbourhoodDepthC() <-chan int { + k.lock.Lock() + defer k.lock.Unlock() if k.nDepthC == nil { k.nDepthC = make(chan int) } @@ -357,7 +325,7 @@ func (k *Kademlia) AddrCountC() <-chan int { } // Off removes a peer from among live peers -func (k *Kademlia) Off(p OverlayConn) { +func (k *Kademlia) Off(p *Peer) { k.lock.Lock() defer k.lock.Unlock() var del bool @@ -367,7 +335,7 @@ func (k *Kademlia) Off(p OverlayConn) { panic(fmt.Sprintf("connected peer not found %v", p)) } del = true - return newEntry(p.Off()) + return newEntry(p.BzzAddr) }) if del { @@ -383,7 +351,7 @@ func (k *Kademlia) Off(p OverlayConn) { } } -func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn OverlayConn, po int) bool) { +func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn *Peer, po int) bool) { k.lock.RLock() defer k.lock.RUnlock() @@ -403,7 +371,7 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con for bin := startPo; bin <= endPo; bin++ { f(func(val pot.Val, _ int) bool { - return eachBinFunc(val.(*entry).conn(), bin) + return eachBinFunc(val.(*Peer), bin) }) } return true @@ -413,13 +381,13 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con // EachConn is an iterator with args (base, po, f) applies f to each live peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -func (k *Kademlia) EachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) { +func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachConn(base, o, f) } -func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) { +func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) { if len(base) == 0 { base = k.base } @@ -428,20 +396,20 @@ func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) b if po > o { return true } - return f(val.(*entry).conn(), po, po >= depth) + return f(val.(*Peer), po, po >= depth) }) } // EachAddr called with (base, po, f) is an iterator applying f to each known peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -func (k *Kademlia) EachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) { +func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachAddr(base, o, f) } -func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) { +func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { if len(base) == 0 { base = k.base } @@ -450,7 +418,7 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) b if po > o { return true } - return f(val.(*entry).addr(), po, po >= depth) + return f(val.(*entry).BzzAddr, po, po >= depth) }) } @@ -472,12 +440,11 @@ func (k *Kademlia) neighbourhoodDepth() (depth int) { return depth } -// callable when called with val, -func (k *Kademlia) callable(val pot.Val) OverlayAddr { - e := val.(*entry) +// callable decides if an address entry represents a callable peer +func (k *Kademlia) callable(e *entry) bool { // not callable if peer is live or exceeded maxRetries - if e.conn() != nil || e.retries > k.MaxRetries { - return nil + if e.conn != nil || e.retries > k.MaxRetries { + return false } // calculate the allowed number of retries based on time lapsed since last seen timeAgo := int64(time.Since(e.seenAt)) @@ -491,17 +458,17 @@ func (k *Kademlia) callable(val pot.Val) OverlayAddr { // peer can be retried again if retries < e.retries { log.Trace(fmt.Sprintf("%08x: %v long time since last try (at %v) needed before retry %v, wait only warrants %v", k.BaseAddr()[:4], e, timeAgo, e.retries, retries)) - return nil + return false } // function to sanction or prevent suggesting a peer - if k.Reachable != nil && !k.Reachable(e.addr()) { + if k.Reachable != nil && !k.Reachable(e.BzzAddr) { log.Trace(fmt.Sprintf("%08x: peer %v is temporarily not callable", k.BaseAddr()[:4], e)) - return nil + return false } e.retries++ log.Trace(fmt.Sprintf("%08x: peer %v is callable", k.BaseAddr()[:4], e)) - return e.addr() + return true } // BaseAddr return the kademlia base address @@ -516,7 +483,8 @@ func (k *Kademlia) String() string { return k.string() } -// String returns kademlia table + kaddb table displayed with ascii +// string returns kademlia table + kaddb table displayed with ascii +// caller must hold the lock func (k *Kademlia) string() string { wsrow := " " var rows []string @@ -538,7 +506,7 @@ func (k *Kademlia) string() string { row := []string{fmt.Sprintf("%2d", size)} rest -= size f(func(val pot.Val, vpo int) bool { - e := val.(*entry) + e := val.(*Peer) row = append(row, fmt.Sprintf("%x", e.Address()[:2])) rowlen++ return rowlen < 4 @@ -594,8 +562,9 @@ type PeerPot struct { EmptyBins []int } -// NewPeerPotMap creates a map of pot record of OverlayAddr with keys +// NewPeerPotMap creates a map of pot record of *BzzAddr with keys // as hexadecimal representations of the address. +// used for testing only func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { // create a table of all nodes for health check np := pot.NewPot(nil, 0) @@ -640,6 +609,7 @@ func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { // saturation returns the lowest proximity order that the bin for that order // has less than n peers +// It is used in Healthy function for testing only func (k *Kademlia) saturation(n int) int { prev := -1 k.addrs.EachBin(k.base, pof, 0, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool { @@ -654,7 +624,7 @@ func (k *Kademlia) saturation(n int) int { } // full returns true if all required bins have connected peers. -// It is used in Healthy function. +// It is used in Healthy function for testing only func (k *Kademlia) full(emptyBins []int) (full bool) { prev := 0 e := len(emptyBins) @@ -688,10 +658,13 @@ func (k *Kademlia) full(emptyBins []int) (full bool) { return e == 0 } +// knowNearestNeighbours tests if all known nearest neighbours given as arguments +// are found in the addressbook +// It is used in Healthy function for testing only func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool { pm := make(map[string]bool) - k.eachAddr(nil, 255, func(p OverlayAddr, po int, nn bool) bool { + k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool { if !nn { return false } @@ -709,10 +682,13 @@ func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool { return true } +// gotNearestNeighbours tests if all known nearest neighbours given as arguments +// are connected peers +// It is used in Healthy function for testing only func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) { pm := make(map[string]bool) - k.eachConn(nil, 255, func(p OverlayConn, po int, nn bool) bool { + k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool { if !nn { return false } @@ -735,6 +711,7 @@ func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missin } // Health state of the Kademlia +// used for testing only type Health struct { KnowNN bool // whether node knows all its nearest neighbours GotNN bool // whether node is connected to all its nearest neighbours @@ -746,6 +723,7 @@ type Health struct { // Healthy reports the health state of the kademlia connectivity // returns a Health struct +// used for testing only func (k *Kademlia) Healthy(pp *PeerPot) *Health { k.lock.RLock() defer k.lock.RUnlock() |