diff options
Diffstat (limited to 'swarm/pss/pss.go')
-rw-r--r-- | swarm/pss/pss.go | 131 |
1 files changed, 80 insertions, 51 deletions
diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go index 1bc28890f..d15401e51 100644 --- a/swarm/pss/pss.go +++ b/swarm/pss/pss.go @@ -891,68 +891,97 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by return nil } -// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct -// The recipient address can be of any length, and the byte slice will be matched to the MSB slice -// of the peer address of the equivalent length. +// sendFunc is a helper function that tries to send a message and returns true on success. +// It is set here for usage in production, and optionally overridden in tests. +var sendFunc func(p *Pss, sp *network.Peer, msg *PssMsg) bool = sendMsg + +// tries to send a message, returns true if successful +func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool { + var isPssEnabled bool + info := sp.Info() + for _, capability := range info.Caps { + if capability == p.capstring { + isPssEnabled = true + break + } + } + if !isPssEnabled { + log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) + return false + } + + // get the protocol peer from the forwarding peer cache + p.fwdPoolMu.RLock() + pp := p.fwdPool[sp.Info().ID] + p.fwdPoolMu.RUnlock() + + err := pp.Send(context.TODO(), msg) + if err != nil { + metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) + log.Error(err.Error()) + } + + return err == nil +} + +// Forwards a pss message to the peer(s) based on recipient address according to the algorithm +// described below. The recipient address can be of any length, and the byte slice will be matched +// to the MSB slice of the peer address of the equivalent length. +// +// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding +// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of +// partial address, it should be forwarded to all the peers matching the partial address, if there +// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message +// forwarding fails, the node should try to forward it to the next best peer, until the message is +// successfully forwarded to at least one peer. func (p *Pss) forward(msg *PssMsg) error { metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) - + sent := 0 // number of successful sends to := make([]byte, addressLength) copy(to[:len(msg.To)], msg.To) + neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth() - // send with kademlia - // find the closest peer to the recipient and attempt to send - sent := 0 - p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool { - info := sp.Info() - - // check if the peer is running pss - var ispss bool - for _, cap := range info.Caps { - if cap == p.capstring { - ispss = true - break - } - } - if !ispss { - log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps) - return true - } + // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness, + // but the luminosity is less. here luminosity equals the number of bits given in the destination address. + luminosityRadius := len(msg.To) * 8 - // get the protocol peer from the forwarding peer cache - sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address()) - p.fwdPoolMu.RLock() - pp := p.fwdPool[sp.Info().ID] - p.fwdPoolMu.RUnlock() + // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth) + pof := pot.DefaultPof(neighbourhoodDepth) - // attempt to send the message - err := pp.Send(context.TODO(), msg) - if err != nil { - metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) - log.Error(err.Error()) - return true + // soft threshold for msg broadcast + broadcastThreshold, _ := pof(to, p.BaseAddr(), 0) + if broadcastThreshold > luminosityRadius { + broadcastThreshold = luminosityRadius + } + + var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address + + // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn + // call below), then peers that fall in the same proximity bin as recipient address will appear + // [at least] one bit closer, but only if these additional bits are given in the recipient address. + if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth { + broadcastThreshold++ + onlySendOnce = true + } + + p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { + if po < broadcastThreshold && sent > 0 { + return false // stop iterating } - sent++ - log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg)) - - // continue forwarding if: - // - if the peer is end recipient but the full address has not been disclosed - // - if the peer address matches the partial address fully - // - if the peer is in proxbin - if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) { - log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match")) - return true - } else if isproxbin { - log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address()))) - return true + if sendFunc(p, sp, msg) { + sent++ + if onlySendOnce { + return false + } + if po == addressLength*8 { + // stop iterating if successfully sent to the exact recipient (perfect match of full address) + return false + } } - // at this point we stop forwarding, and the state is as follows: - // - the peer is end recipient and we have full address - // - we are not in proxbin (directed routing) - // - partial addresses don't fully match - return false + return true }) + // if we failed to send to anyone, re-insert message in the send-queue if sent == 0 { log.Debug("unable to forward to any peers") if err := p.enqueue(msg); err != nil { |