aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/pss/pss.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/pss/pss.go')
-rw-r--r--swarm/pss/pss.go857
1 files changed, 0 insertions, 857 deletions
diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go
deleted file mode 100644
index 0d02c9b8d..000000000
--- a/swarm/pss/pss.go
+++ /dev/null
@@ -1,857 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package pss
-
-import (
- "bytes"
- "context"
- "crypto/ecdsa"
- "crypto/rand"
- "errors"
- "fmt"
- "hash"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/protocols"
- "github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/network"
- "github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/storage"
- whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
- "golang.org/x/crypto/sha3"
-)
-
-const (
- defaultPaddingByteSize = 16
- DefaultMsgTTL = time.Second * 120
- defaultDigestCacheTTL = time.Second * 10
- defaultSymKeyCacheCapacity = 512
- digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
- defaultWhisperWorkTime = 3
- defaultWhisperPoW = 0.0000000001
- defaultMaxMsgSize = 1024 * 1024
- defaultCleanInterval = time.Second * 60 * 10
- defaultOutboxCapacity = 100000
- pssProtocolName = "pss"
- pssVersion = 2
- hasherCount = 8
-)
-
-var (
- addressLength = len(pot.Address{})
-)
-
-// cache is used for preventing backwards routing
-// will also be instrumental in flood guard mechanism
-// and mailbox implementation
-type pssCacheEntry struct {
- expiresAt time.Time
-}
-
-// abstraction to enable access to p2p.protocols.Peer.Send
-type senderPeer interface {
- Info() *p2p.PeerInfo
- ID() enode.ID
- Address() []byte
- Send(context.Context, interface{}) error
-}
-
-// per-key peer related information
-// member `protected` prevents garbage collection of the instance
-type pssPeer struct {
- lastSeen time.Time
- address PssAddress
- protected bool
-}
-
-// Pss configuration parameters
-type PssParams struct {
- MsgTTL time.Duration
- CacheTTL time.Duration
- privateKey *ecdsa.PrivateKey
- SymKeyCacheCapacity int
- AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
-}
-
-// Sane defaults for Pss
-func NewPssParams() *PssParams {
- return &PssParams{
- MsgTTL: DefaultMsgTTL,
- CacheTTL: defaultDigestCacheTTL,
- SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
- }
-}
-
-func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
- params.privateKey = privatekey
- return params
-}
-
-// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
-//
-// Implements node.Service
-type Pss struct {
- *network.Kademlia // we can get the Kademlia address from this
- *KeyStore
-
- privateKey *ecdsa.PrivateKey // pss can have it's own independent key
- auxAPIs []rpc.API // builtins (handshake, test) can add APIs
-
- // sending and forwarding
- fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
- fwdPoolMu sync.RWMutex
- fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
- fwdCacheMu sync.RWMutex
- cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
- msgTTL time.Duration
- paddingByteSize int
- capstring string
- outbox chan *PssMsg
-
- // message handling
- handlers map[Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle()
- handlersMu sync.RWMutex
- hashPool sync.Pool
- topicHandlerCaps map[Topic]*handlerCaps // caches capabilities of each topic's handlers
- topicHandlerCapsMu sync.RWMutex
-
- // process
- quitC chan struct{}
-}
-
-func (p *Pss) String() string {
- return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
-}
-
-// Creates a new Pss instance.
-//
-// In addition to params, it takes a swarm network Kademlia
-// and a FileStore storage for message cache storage.
-func NewPss(k *network.Kademlia, params *PssParams) (*Pss, error) {
- if params.privateKey == nil {
- return nil, errors.New("missing private key for pss")
- }
- cap := p2p.Cap{
- Name: pssProtocolName,
- Version: pssVersion,
- }
- ps := &Pss{
- Kademlia: k,
- KeyStore: loadKeyStore(),
-
- privateKey: params.privateKey,
- quitC: make(chan struct{}),
-
- fwdPool: make(map[string]*protocols.Peer),
- fwdCache: make(map[pssDigest]pssCacheEntry),
- cacheTTL: params.CacheTTL,
- msgTTL: params.MsgTTL,
- paddingByteSize: defaultPaddingByteSize,
- capstring: cap.String(),
- outbox: make(chan *PssMsg, defaultOutboxCapacity),
-
- handlers: make(map[Topic]map[*handler]bool),
- topicHandlerCaps: make(map[Topic]*handlerCaps),
-
- hashPool: sync.Pool{
- New: func() interface{} {
- return sha3.NewLegacyKeccak256()
- },
- },
- }
-
- for i := 0; i < hasherCount; i++ {
- hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
- ps.hashPool.Put(hashfunc)
- }
-
- return ps, nil
-}
-
-/////////////////////////////////////////////////////////////////////
-// SECTION: node.Service interface
-/////////////////////////////////////////////////////////////////////
-
-func (p *Pss) Start(srv *p2p.Server) error {
- go func() {
- ticker := time.NewTicker(defaultCleanInterval)
- cacheTicker := time.NewTicker(p.cacheTTL)
- defer ticker.Stop()
- defer cacheTicker.Stop()
- for {
- select {
- case <-cacheTicker.C:
- p.cleanFwdCache()
- case <-ticker.C:
- p.cleanKeys()
- case <-p.quitC:
- return
- }
- }
- }()
- go func() {
- for {
- select {
- case msg := <-p.outbox:
- err := p.forward(msg)
- if err != nil {
- log.Error(err.Error())
- metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
- }
- case <-p.quitC:
- return
- }
- }
- }()
- log.Info("Started Pss")
- log.Info("Loaded EC keys", "pubkey", common.ToHex(crypto.FromECDSAPub(p.PublicKey())), "secp256", common.ToHex(crypto.CompressPubkey(p.PublicKey())))
- return nil
-}
-
-func (p *Pss) Stop() error {
- log.Info("Pss shutting down")
- close(p.quitC)
- return nil
-}
-
-var pssSpec = &protocols.Spec{
- Name: pssProtocolName,
- Version: pssVersion,
- MaxMsgSize: defaultMaxMsgSize,
- Messages: []interface{}{
- PssMsg{},
- },
-}
-
-func (p *Pss) Protocols() []p2p.Protocol {
- return []p2p.Protocol{
- {
- Name: pssSpec.Name,
- Version: pssSpec.Version,
- Length: pssSpec.Length(),
- Run: p.Run,
- },
- }
-}
-
-func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
- pp := protocols.NewPeer(peer, rw, pssSpec)
- p.fwdPoolMu.Lock()
- p.fwdPool[peer.Info().ID] = pp
- p.fwdPoolMu.Unlock()
- return pp.Run(p.handlePssMsg)
-}
-
-func (p *Pss) APIs() []rpc.API {
- apis := []rpc.API{
- {
- Namespace: "pss",
- Version: "1.0",
- Service: NewAPI(p),
- Public: true,
- },
- }
- apis = append(apis, p.auxAPIs...)
- return apis
-}
-
-// add API methods to the pss API
-// must be run before node is started
-func (p *Pss) addAPI(api rpc.API) {
- p.auxAPIs = append(p.auxAPIs, api)
-}
-
-// Returns the swarm Kademlia address of the pss node
-func (p *Pss) BaseAddr() []byte {
- return p.Kademlia.BaseAddr()
-}
-
-// Returns the pss node's public key
-func (p *Pss) PublicKey() *ecdsa.PublicKey {
- return &p.privateKey.PublicKey
-}
-
-/////////////////////////////////////////////////////////////////////
-// SECTION: Message handling
-/////////////////////////////////////////////////////////////////////
-
-func (p *Pss) getTopicHandlerCaps(topic Topic) (hc *handlerCaps, found bool) {
- p.topicHandlerCapsMu.RLock()
- defer p.topicHandlerCapsMu.RUnlock()
- hc, found = p.topicHandlerCaps[topic]
- return
-}
-
-func (p *Pss) setTopicHandlerCaps(topic Topic, hc *handlerCaps) {
- p.topicHandlerCapsMu.Lock()
- defer p.topicHandlerCapsMu.Unlock()
- p.topicHandlerCaps[topic] = hc
-}
-
-// Links a handler function to a Topic
-//
-// All incoming messages with an envelope Topic matching the
-// topic specified will be passed to the given Handler function.
-//
-// There may be an arbitrary number of handler functions per topic.
-//
-// Returns a deregister function which needs to be called to
-// deregister the handler,
-func (p *Pss) Register(topic *Topic, hndlr *handler) func() {
- p.handlersMu.Lock()
- defer p.handlersMu.Unlock()
- handlers := p.handlers[*topic]
- if handlers == nil {
- handlers = make(map[*handler]bool)
- p.handlers[*topic] = handlers
- log.Debug("registered handler", "capabilities", hndlr.caps)
- }
- if hndlr.caps == nil {
- hndlr.caps = &handlerCaps{}
- }
- handlers[hndlr] = true
-
- capabilities, ok := p.getTopicHandlerCaps(*topic)
- if !ok {
- capabilities = &handlerCaps{}
- p.setTopicHandlerCaps(*topic, capabilities)
- }
-
- if hndlr.caps.raw {
- capabilities.raw = true
- }
- if hndlr.caps.prox {
- capabilities.prox = true
- }
- return func() { p.deregister(topic, hndlr) }
-}
-
-func (p *Pss) deregister(topic *Topic, hndlr *handler) {
- p.handlersMu.Lock()
- defer p.handlersMu.Unlock()
- handlers := p.handlers[*topic]
- if len(handlers) > 1 {
- delete(p.handlers, *topic)
- // topic caps might have changed now that a handler is gone
- caps := &handlerCaps{}
- for h := range handlers {
- if h.caps.raw {
- caps.raw = true
- }
- if h.caps.prox {
- caps.prox = true
- }
- }
- p.setTopicHandlerCaps(*topic, caps)
- return
- }
- delete(handlers, hndlr)
-}
-
-// Filters incoming messages for processing or forwarding.
-// Check if address partially matches
-// If yes, it CAN be for us, and we process it
-// Only passes error to pss protocol handler if payload is not valid pssmsg
-func (p *Pss) handlePssMsg(ctx context.Context, msg interface{}) error {
- metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
- pssmsg, ok := msg.(*PssMsg)
- if !ok {
- return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
- }
- log.Trace("handler", "self", label(p.Kademlia.BaseAddr()), "topic", label(pssmsg.Payload.Topic[:]))
- if int64(pssmsg.Expire) < time.Now().Unix() {
- metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
- log.Warn("pss filtered expired message", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", common.ToHex(pssmsg.To))
- return nil
- }
- if p.checkFwdCache(pssmsg) {
- log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Kademlia.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
- return nil
- }
- p.addFwdCache(pssmsg)
-
- psstopic := Topic(pssmsg.Payload.Topic)
-
- // raw is simplest handler contingency to check, so check that first
- var isRaw bool
- if pssmsg.isRaw() {
- if capabilities, ok := p.getTopicHandlerCaps(psstopic); ok {
- if !capabilities.raw {
- log.Debug("No handler for raw message", "topic", psstopic)
- return nil
- }
- }
- isRaw = true
- }
-
- // check if we can be recipient:
- // - no prox handler on message and partial address matches
- // - prox handler on message and we are in prox regardless of partial address match
- // store this result so we don't calculate again on every handler
- var isProx bool
- if capabilities, ok := p.getTopicHandlerCaps(psstopic); ok {
- isProx = capabilities.prox
- }
- isRecipient := p.isSelfPossibleRecipient(pssmsg, isProx)
- if !isRecipient {
- log.Trace("pss msg forwarding ===>", "pss", common.ToHex(p.BaseAddr()), "prox", isProx)
- return p.enqueue(pssmsg)
- }
-
- log.Trace("pss msg processing <===", "pss", common.ToHex(p.BaseAddr()), "prox", isProx, "raw", isRaw, "topic", label(pssmsg.Payload.Topic[:]))
- if err := p.process(pssmsg, isRaw, isProx); err != nil {
- qerr := p.enqueue(pssmsg)
- if qerr != nil {
- return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
- }
- }
- return nil
-}
-
-// Entry point to processing a message for which the current node can be the intended recipient.
-// Attempts symmetric and asymmetric decryption with stored keys.
-// Dispatches message to all handlers matching the message topic
-func (p *Pss) process(pssmsg *PssMsg, raw bool, prox bool) error {
- metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
-
- var err error
- var recvmsg *whisper.ReceivedMessage
- var payload []byte
- var from PssAddress
- var asymmetric bool
- var keyid string
- var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, PssAddress, error)
-
- envelope := pssmsg.Payload
- psstopic := Topic(envelope.Topic)
-
- if raw {
- payload = pssmsg.Payload.Data
- } else {
- if pssmsg.isSym() {
- keyFunc = p.processSym
- } else {
- asymmetric = true
- keyFunc = p.processAsym
- }
-
- recvmsg, keyid, from, err = keyFunc(envelope)
- if err != nil {
- return errors.New("Decryption failed")
- }
- payload = recvmsg.Payload
- }
-
- if len(pssmsg.To) < addressLength || prox {
- err = p.enqueue(pssmsg)
- }
- p.executeHandlers(psstopic, payload, from, raw, prox, asymmetric, keyid)
- return err
-}
-
-// copy all registered handlers for respective topic in order to avoid data race or deadlock
-func (p *Pss) getHandlers(topic Topic) (ret []*handler) {
- p.handlersMu.RLock()
- defer p.handlersMu.RUnlock()
- for k := range p.handlers[topic] {
- ret = append(ret, k)
- }
- return ret
-}
-
-func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw bool, prox bool, asymmetric bool, keyid string) {
- handlers := p.getHandlers(topic)
- peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
- for _, h := range handlers {
- if !h.caps.raw && raw {
- log.Warn("norawhandler")
- continue
- }
- if !h.caps.prox && prox {
- log.Warn("noproxhandler")
- continue
- }
- err := (h.f)(payload, peer, asymmetric, keyid)
- if err != nil {
- log.Warn("Pss handler failed", "err", err)
- }
- }
-}
-
-// will return false if using partial address
-func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
- return bytes.Equal(msg.To, p.Kademlia.BaseAddr())
-}
-
-// test match of leftmost bytes in given message to node's Kademlia address
-func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
- local := p.Kademlia.BaseAddr()
-
- // if a partial address matches we are possible recipient regardless of prox
- // if not and prox is not set, we are surely not
- if bytes.Equal(msg.To, local[:len(msg.To)]) {
-
- return true
- } else if !prox {
- return false
- }
-
- depth := p.Kademlia.NeighbourhoodDepth()
- po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
- log.Trace("selfpossible", "po", po, "depth", depth)
-
- return depth <= po
-}
-
-/////////////////////////////////////////////////////////////////////
-// SECTION: Message sending
-/////////////////////////////////////////////////////////////////////
-
-func (p *Pss) enqueue(msg *PssMsg) error {
- select {
- case p.outbox <- msg:
- return nil
- default:
- }
-
- metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
- return errors.New("outbox full")
-}
-
-// Send a raw message (any encryption is responsibility of calling client)
-//
-// Will fail if raw messages are disallowed
-func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
- if err := validateAddress(address); err != nil {
- return err
- }
- pssMsgParams := &msgParams{
- raw: true,
- }
- payload := &whisper.Envelope{
- Data: msg,
- Topic: whisper.TopicType(topic),
- }
- pssMsg := newPssMsg(pssMsgParams)
- pssMsg.To = address
- pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
- pssMsg.Payload = payload
- p.addFwdCache(pssMsg)
- err := p.enqueue(pssMsg)
- if err != nil {
- return err
- }
-
- // if we have a proxhandler on this topic
- // also deliver message to ourselves
- if capabilities, ok := p.getTopicHandlerCaps(topic); ok {
- if p.isSelfPossibleRecipient(pssMsg, true) && capabilities.prox {
- return p.process(pssMsg, true, true)
- }
- }
- return nil
-}
-
-// Send a message using symmetric encryption
-//
-// Fails if the key id does not match any of the stored symmetric keys
-func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
- symkey, err := p.GetSymmetricKey(symkeyid)
- if err != nil {
- return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
- }
- psp, ok := p.getPeerSym(symkeyid, topic)
- if !ok {
- return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
- }
- return p.send(psp.address, topic, msg, false, symkey)
-}
-
-// Send a message using asymmetric encryption
-//
-// Fails if the key id does not match any in of the stored public keys
-func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
- if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
- return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
- }
- psp, ok := p.getPeerPub(pubkeyid, topic)
- if !ok {
- return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
- }
- return p.send(psp.address, topic, msg, true, common.FromHex(pubkeyid))
-}
-
-// Send is payload agnostic, and will accept any byte slice as payload
-// It generates an whisper envelope for the specified recipient and topic,
-// and wraps the message payload in it.
-// TODO: Implement proper message padding
-func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
- metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
-
- if key == nil || bytes.Equal(key, []byte{}) {
- return fmt.Errorf("Zero length key passed to pss send")
- }
- padding := make([]byte, p.paddingByteSize)
- c, err := rand.Read(padding)
- if err != nil {
- return err
- } else if c < p.paddingByteSize {
- return fmt.Errorf("invalid padding length: %d", c)
- }
- wparams := &whisper.MessageParams{
- TTL: defaultWhisperTTL,
- Src: p.privateKey,
- Topic: whisper.TopicType(topic),
- WorkTime: defaultWhisperWorkTime,
- PoW: defaultWhisperPoW,
- Payload: msg,
- Padding: padding,
- }
- if asymmetric {
- pk, err := crypto.UnmarshalPubkey(key)
- if err != nil {
- return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
- }
- wparams.Dst = pk
- } else {
- wparams.KeySym = key
- }
- // set up outgoing message container, which does encryption and envelope wrapping
- woutmsg, err := whisper.NewSentMessage(wparams)
- if err != nil {
- return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
- }
- // performs encryption.
- // Does NOT perform / performs negligible PoW due to very low difficulty setting
- // after this the message is ready for sending
- envelope, err := woutmsg.Wrap(wparams)
- if err != nil {
- return fmt.Errorf("failed to perform whisper encryption: %v", err)
- }
- log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
-
- // prepare for devp2p transport
- pssMsgParams := &msgParams{
- sym: !asymmetric,
- }
- pssMsg := newPssMsg(pssMsgParams)
- pssMsg.To = to
- pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
- pssMsg.Payload = envelope
- err = p.enqueue(pssMsg)
- if err != nil {
- return err
- }
- if capabilities, ok := p.getTopicHandlerCaps(topic); ok {
- if p.isSelfPossibleRecipient(pssMsg, true) && capabilities.prox {
- return p.process(pssMsg, true, true)
- }
- }
- return nil
-}
-
-// sendFunc is a helper function that tries to send a message and returns true on success.
-// It is set here for usage in production, and optionally overridden in tests.
-var sendFunc = sendMsg
-
-// tries to send a message, returns true if successful
-func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
- var isPssEnabled bool
- info := sp.Info()
- for _, capability := range info.Caps {
- if capability == p.capstring {
- isPssEnabled = true
- break
- }
- }
- if !isPssEnabled {
- log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
- return false
- }
-
- // get the protocol peer from the forwarding peer cache
- p.fwdPoolMu.RLock()
- pp := p.fwdPool[sp.Info().ID]
- p.fwdPoolMu.RUnlock()
-
- err := pp.Send(context.TODO(), msg)
- if err != nil {
- metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
- log.Error(err.Error())
- }
-
- return err == nil
-}
-
-// Forwards a pss message to the peer(s) based on recipient address according to the algorithm
-// described below. The recipient address can be of any length, and the byte slice will be matched
-// to the MSB slice of the peer address of the equivalent length.
-//
-// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
-// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
-// partial address, it should be forwarded to all the peers matching the partial address, if there
-// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
-// forwarding fails, the node should try to forward it to the next best peer, until the message is
-// successfully forwarded to at least one peer.
-func (p *Pss) forward(msg *PssMsg) error {
- metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
- sent := 0 // number of successful sends
- to := make([]byte, addressLength)
- copy(to[:len(msg.To)], msg.To)
- neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
-
- // luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
- // but the luminosity is less. here luminosity equals the number of bits given in the destination address.
- luminosityRadius := len(msg.To) * 8
-
- // proximity order function matching up to neighbourhoodDepth bits (po <= neighbourhoodDepth)
- pof := pot.DefaultPof(neighbourhoodDepth)
-
- // soft threshold for msg broadcast
- broadcastThreshold, _ := pof(to, p.BaseAddr(), 0)
- if broadcastThreshold > luminosityRadius {
- broadcastThreshold = luminosityRadius
- }
-
- var onlySendOnce bool // indicates if the message should only be sent to one peer with closest address
-
- // if measured from the recipient address as opposed to the base address (see Kademlia.EachConn
- // call below), then peers that fall in the same proximity bin as recipient address will appear
- // [at least] one bit closer, but only if these additional bits are given in the recipient address.
- if broadcastThreshold < luminosityRadius && broadcastThreshold < neighbourhoodDepth {
- broadcastThreshold++
- onlySendOnce = true
- }
-
- p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
- if po < broadcastThreshold && sent > 0 {
- return false // stop iterating
- }
- if sendFunc(p, sp, msg) {
- sent++
- if onlySendOnce {
- return false
- }
- if po == addressLength*8 {
- // stop iterating if successfully sent to the exact recipient (perfect match of full address)
- return false
- }
- }
- return true
- })
-
- // if we failed to send to anyone, re-insert message in the send-queue
- if sent == 0 {
- log.Debug("unable to forward to any peers")
- if err := p.enqueue(msg); err != nil {
- metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
- log.Error(err.Error())
- return err
- }
- }
-
- // cache the message
- p.addFwdCache(msg)
- return nil
-}
-
-/////////////////////////////////////////////////////////////////////
-// SECTION: Caching
-/////////////////////////////////////////////////////////////////////
-
-// cleanFwdCache is used to periodically remove expired entries from the forward cache
-func (p *Pss) cleanFwdCache() {
- metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
- p.fwdCacheMu.Lock()
- defer p.fwdCacheMu.Unlock()
- for k, v := range p.fwdCache {
- if v.expiresAt.Before(time.Now()) {
- delete(p.fwdCache, k)
- }
- }
-}
-
-func label(b []byte) string {
- return fmt.Sprintf("%04x", b[:2])
-}
-
-// add a message to the cache
-func (p *Pss) addFwdCache(msg *PssMsg) error {
- metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
-
- var entry pssCacheEntry
- var ok bool
-
- p.fwdCacheMu.Lock()
- defer p.fwdCacheMu.Unlock()
-
- digest := p.digest(msg)
- if entry, ok = p.fwdCache[digest]; !ok {
- entry = pssCacheEntry{}
- }
- entry.expiresAt = time.Now().Add(p.cacheTTL)
- p.fwdCache[digest] = entry
- return nil
-}
-
-// check if message is in the cache
-func (p *Pss) checkFwdCache(msg *PssMsg) bool {
- p.fwdCacheMu.Lock()
- defer p.fwdCacheMu.Unlock()
-
- digest := p.digest(msg)
- entry, ok := p.fwdCache[digest]
- if ok {
- if entry.expiresAt.After(time.Now()) {
- log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
- metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
- return true
- }
- metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
- }
- return false
-}
-
-// Digest of message
-func (p *Pss) digest(msg *PssMsg) pssDigest {
- return p.digestBytes(msg.serialize())
-}
-
-func (p *Pss) digestBytes(msg []byte) pssDigest {
- hasher := p.hashPool.Get().(hash.Hash)
- defer p.hashPool.Put(hasher)
- hasher.Reset()
- hasher.Write(msg)
- digest := pssDigest{}
- key := hasher.Sum(nil)
- copy(digest[:], key[:digestLength])
- return digest
-}
-
-func validateAddress(addr PssAddress) error {
- if len(addr) > addressLength {
- return errors.New("address too long")
- }
- return nil
-}