aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/pss/pss.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/pss/pss.go')
-rw-r--r--swarm/pss/pss.go951
1 files changed, 951 insertions, 0 deletions
diff --git a/swarm/pss/pss.go b/swarm/pss/pss.go
new file mode 100644
index 000000000..77191b25a
--- /dev/null
+++ b/swarm/pss/pss.go
@@ -0,0 +1,951 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package pss
+
+import (
+ "bytes"
+ "crypto/ecdsa"
+ "crypto/rand"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/pot"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
+)
+
+const (
+ defaultPaddingByteSize = 16
+ defaultMsgTTL = time.Second * 120
+ defaultDigestCacheTTL = time.Second * 10
+ defaultSymKeyCacheCapacity = 512
+ digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
+ defaultWhisperWorkTime = 3
+ defaultWhisperPoW = 0.0000000001
+ defaultMaxMsgSize = 1024 * 1024
+ defaultCleanInterval = time.Second * 60 * 10
+ defaultOutboxCapacity = 100000
+ pssProtocolName = "pss"
+ pssVersion = 2
+ hasherCount = 8
+)
+
+var (
+ addressLength = len(pot.Address{})
+)
+
+// cache is used for preventing backwards routing
+// will also be instrumental in flood guard mechanism
+// and mailbox implementation
+type pssCacheEntry struct {
+ expiresAt time.Time
+}
+
+// abstraction to enable access to p2p.protocols.Peer.Send
+type senderPeer interface {
+ Info() *p2p.PeerInfo
+ ID() discover.NodeID
+ Address() []byte
+ Send(interface{}) error
+}
+
+// per-key peer related information
+// member `protected` prevents garbage collection of the instance
+type pssPeer struct {
+ lastSeen time.Time
+ address *PssAddress
+ protected bool
+}
+
+// Pss configuration parameters
+type PssParams struct {
+ MsgTTL time.Duration
+ CacheTTL time.Duration
+ privateKey *ecdsa.PrivateKey
+ SymKeyCacheCapacity int
+ AllowRaw bool // If true, enables sending and receiving messages without builtin pss encryption
+}
+
+// Sane defaults for Pss
+func NewPssParams() *PssParams {
+ return &PssParams{
+ MsgTTL: defaultMsgTTL,
+ CacheTTL: defaultDigestCacheTTL,
+ SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
+ }
+}
+
+func (params *PssParams) WithPrivateKey(privatekey *ecdsa.PrivateKey) *PssParams {
+ params.privateKey = privatekey
+ return params
+}
+
+// Toplevel pss object, takes care of message sending, receiving, decryption and encryption, message handler dispatchers and message forwarding.
+//
+// Implements node.Service
+type Pss struct {
+ network.Overlay // we can get the overlayaddress from this
+ privateKey *ecdsa.PrivateKey // pss can have it's own independent key
+ w *whisper.Whisper // key and encryption backend
+ auxAPIs []rpc.API // builtins (handshake, test) can add APIs
+
+ // sending and forwarding
+ fwdPool map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer
+ fwdPoolMu sync.RWMutex
+ fwdCache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
+ fwdCacheMu sync.RWMutex
+ cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented)
+ msgTTL time.Duration
+ paddingByteSize int
+ capstring string
+ outbox chan *PssMsg
+
+ // keys and peers
+ pubKeyPool map[string]map[Topic]*pssPeer // mapping of hex public keys to peer address by topic.
+ pubKeyPoolMu sync.RWMutex
+ symKeyPool map[string]map[Topic]*pssPeer // mapping of symkeyids to peer address by topic.
+ symKeyPoolMu sync.RWMutex
+ symKeyDecryptCache []*string // fast lookup of symkeys recently used for decryption; last used is on top of stack
+ symKeyDecryptCacheCursor int // modular cursor pointing to last used, wraps on symKeyDecryptCache array
+ symKeyDecryptCacheCapacity int // max amount of symkeys to keep.
+
+ // message handling
+ handlers map[Topic]map[*Handler]bool // topic and version based pss payload handlers. See pss.Handle()
+ handlersMu sync.RWMutex
+ allowRaw bool
+ hashPool sync.Pool
+
+ // process
+ quitC chan struct{}
+}
+
+func (p *Pss) String() string {
+ return fmt.Sprintf("pss: addr %x, pubkey %v", p.BaseAddr(), common.ToHex(crypto.FromECDSAPub(&p.privateKey.PublicKey)))
+}
+
+// Creates a new Pss instance.
+//
+// In addition to params, it takes a swarm network overlay
+// and a FileStore storage for message cache storage.
+func NewPss(k network.Overlay, params *PssParams) (*Pss, error) {
+ if params.privateKey == nil {
+ return nil, errors.New("missing private key for pss")
+ }
+ cap := p2p.Cap{
+ Name: pssProtocolName,
+ Version: pssVersion,
+ }
+ ps := &Pss{
+ Overlay: k,
+ privateKey: params.privateKey,
+ w: whisper.New(&whisper.DefaultConfig),
+ quitC: make(chan struct{}),
+
+ fwdPool: make(map[string]*protocols.Peer),
+ fwdCache: make(map[pssDigest]pssCacheEntry),
+ cacheTTL: params.CacheTTL,
+ msgTTL: params.MsgTTL,
+ paddingByteSize: defaultPaddingByteSize,
+ capstring: cap.String(),
+ outbox: make(chan *PssMsg, defaultOutboxCapacity),
+
+ pubKeyPool: make(map[string]map[Topic]*pssPeer),
+ symKeyPool: make(map[string]map[Topic]*pssPeer),
+ symKeyDecryptCache: make([]*string, params.SymKeyCacheCapacity),
+ symKeyDecryptCacheCapacity: params.SymKeyCacheCapacity,
+
+ handlers: make(map[Topic]map[*Handler]bool),
+ allowRaw: params.AllowRaw,
+ hashPool: sync.Pool{
+ New: func() interface{} {
+ return storage.MakeHashFunc(storage.DefaultHash)()
+ },
+ },
+ }
+
+ for i := 0; i < hasherCount; i++ {
+ hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
+ ps.hashPool.Put(hashfunc)
+ }
+
+ return ps, nil
+}
+
+/////////////////////////////////////////////////////////////////////
+// SECTION: node.Service interface
+/////////////////////////////////////////////////////////////////////
+
+func (p *Pss) Start(srv *p2p.Server) error {
+ go func() {
+ ticker := time.NewTicker(defaultCleanInterval)
+ cacheTicker := time.NewTicker(p.cacheTTL)
+ defer ticker.Stop()
+ defer cacheTicker.Stop()
+ for {
+ select {
+ case <-cacheTicker.C:
+ p.cleanFwdCache()
+ case <-ticker.C:
+ p.cleanKeys()
+ case <-p.quitC:
+ return
+ }
+ }
+ }()
+ go func() {
+ for {
+ select {
+ case msg := <-p.outbox:
+ err := p.forward(msg)
+ if err != nil {
+ log.Error(err.Error())
+ metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
+ }
+ case <-p.quitC:
+ return
+ }
+ }
+ }()
+ log.Debug("Started pss", "public key", common.ToHex(crypto.FromECDSAPub(p.PublicKey())))
+ return nil
+}
+
+func (p *Pss) Stop() error {
+ log.Info("pss shutting down")
+ close(p.quitC)
+ return nil
+}
+
+var pssSpec = &protocols.Spec{
+ Name: pssProtocolName,
+ Version: pssVersion,
+ MaxMsgSize: defaultMaxMsgSize,
+ Messages: []interface{}{
+ PssMsg{},
+ },
+}
+
+func (p *Pss) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{
+ {
+ Name: pssSpec.Name,
+ Version: pssSpec.Version,
+ Length: pssSpec.Length(),
+ Run: p.Run,
+ },
+ }
+}
+
+func (p *Pss) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ pp := protocols.NewPeer(peer, rw, pssSpec)
+ p.fwdPoolMu.Lock()
+ p.fwdPool[peer.Info().ID] = pp
+ p.fwdPoolMu.Unlock()
+ return pp.Run(p.handlePssMsg)
+}
+
+func (p *Pss) APIs() []rpc.API {
+ apis := []rpc.API{
+ {
+ Namespace: "pss",
+ Version: "1.0",
+ Service: NewAPI(p),
+ Public: true,
+ },
+ }
+ apis = append(apis, p.auxAPIs...)
+ return apis
+}
+
+// add API methods to the pss API
+// must be run before node is started
+func (p *Pss) addAPI(api rpc.API) {
+ p.auxAPIs = append(p.auxAPIs, api)
+}
+
+// Returns the swarm overlay address of the pss node
+func (p *Pss) BaseAddr() []byte {
+ return p.Overlay.BaseAddr()
+}
+
+// Returns the pss node's public key
+func (p *Pss) PublicKey() *ecdsa.PublicKey {
+ return &p.privateKey.PublicKey
+}
+
+/////////////////////////////////////////////////////////////////////
+// SECTION: Message handling
+/////////////////////////////////////////////////////////////////////
+
+// Links a handler function to a Topic
+//
+// All incoming messages with an envelope Topic matching the
+// topic specified will be passed to the given Handler function.
+//
+// There may be an arbitrary number of handler functions per topic.
+//
+// Returns a deregister function which needs to be called to
+// deregister the handler,
+func (p *Pss) Register(topic *Topic, handler Handler) func() {
+ p.handlersMu.Lock()
+ defer p.handlersMu.Unlock()
+ handlers := p.handlers[*topic]
+ if handlers == nil {
+ handlers = make(map[*Handler]bool)
+ p.handlers[*topic] = handlers
+ }
+ handlers[&handler] = true
+ return func() { p.deregister(topic, &handler) }
+}
+func (p *Pss) deregister(topic *Topic, h *Handler) {
+ p.handlersMu.Lock()
+ defer p.handlersMu.Unlock()
+ handlers := p.handlers[*topic]
+ if len(handlers) == 1 {
+ delete(p.handlers, *topic)
+ return
+ }
+ delete(handlers, h)
+}
+
+// get all registered handlers for respective topics
+func (p *Pss) getHandlers(topic Topic) map[*Handler]bool {
+ p.handlersMu.RLock()
+ defer p.handlersMu.RUnlock()
+ return p.handlers[topic]
+}
+
+// Filters incoming messages for processing or forwarding.
+// Check if address partially matches
+// If yes, it CAN be for us, and we process it
+// Only passes error to pss protocol handler if payload is not valid pssmsg
+func (p *Pss) handlePssMsg(msg interface{}) error {
+ metrics.GetOrRegisterCounter("pss.handlepssmsg", nil).Inc(1)
+
+ pssmsg, ok := msg.(*PssMsg)
+
+ if !ok {
+ return fmt.Errorf("invalid message type. Expected *PssMsg, got %T ", msg)
+ }
+ if int64(pssmsg.Expire) < time.Now().Unix() {
+ metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
+ log.Warn("pss filtered expired message", "from", fmt.Sprintf("%x", p.Overlay.BaseAddr()), "to", fmt.Sprintf("%x", common.ToHex(pssmsg.To)))
+ return nil
+ }
+ if p.checkFwdCache(pssmsg) {
+ log.Trace(fmt.Sprintf("pss relay block-cache match (process): FROM %x TO %x", p.Overlay.BaseAddr(), common.ToHex(pssmsg.To)))
+ return nil
+ }
+ p.addFwdCache(pssmsg)
+
+ if !p.isSelfPossibleRecipient(pssmsg) {
+ log.Trace("pss was for someone else :'( ... forwarding", "pss", common.ToHex(p.BaseAddr()))
+ return p.enqueue(pssmsg)
+ }
+
+ log.Trace("pss for us, yay! ... let's process!", "pss", common.ToHex(p.BaseAddr()))
+ if err := p.process(pssmsg); err != nil {
+ qerr := p.enqueue(pssmsg)
+ if qerr != nil {
+ return fmt.Errorf("process fail: processerr %v, queueerr: %v", err, qerr)
+ }
+ }
+ return nil
+
+}
+
+// Entry point to processing a message for which the current node can be the intended recipient.
+// Attempts symmetric and asymmetric decryption with stored keys.
+// Dispatches message to all handlers matching the message topic
+func (p *Pss) process(pssmsg *PssMsg) error {
+ metrics.GetOrRegisterCounter("pss.process", nil).Inc(1)
+
+ var err error
+ var recvmsg *whisper.ReceivedMessage
+ var payload []byte
+ var from *PssAddress
+ var asymmetric bool
+ var keyid string
+ var keyFunc func(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error)
+
+ envelope := pssmsg.Payload
+ psstopic := Topic(envelope.Topic)
+ if pssmsg.isRaw() {
+ if !p.allowRaw {
+ return errors.New("raw message support disabled")
+ }
+ payload = pssmsg.Payload.Data
+ } else {
+ if pssmsg.isSym() {
+ keyFunc = p.processSym
+ } else {
+ asymmetric = true
+ keyFunc = p.processAsym
+ }
+
+ recvmsg, keyid, from, err = keyFunc(envelope)
+ if err != nil {
+ return errors.New("Decryption failed")
+ }
+ payload = recvmsg.Payload
+ }
+
+ if len(pssmsg.To) < addressLength {
+ if err := p.enqueue(pssmsg); err != nil {
+ return err
+ }
+ }
+ p.executeHandlers(psstopic, payload, from, asymmetric, keyid)
+
+ return nil
+
+}
+
+func (p *Pss) executeHandlers(topic Topic, payload []byte, from *PssAddress, asymmetric bool, keyid string) {
+ handlers := p.getHandlers(topic)
+ nid, _ := discover.HexID("0x00") // this hack is needed to satisfy the p2p method
+ peer := p2p.NewPeer(nid, fmt.Sprintf("%x", from), []p2p.Cap{})
+ for f := range handlers {
+ err := (*f)(payload, peer, asymmetric, keyid)
+ if err != nil {
+ log.Warn("Pss handler %p failed: %v", f, err)
+ }
+ }
+}
+
+// will return false if using partial address
+func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
+ return bytes.Equal(msg.To, p.Overlay.BaseAddr())
+}
+
+// test match of leftmost bytes in given message to node's overlay address
+func (p *Pss) isSelfPossibleRecipient(msg *PssMsg) bool {
+ local := p.Overlay.BaseAddr()
+ return bytes.Equal(msg.To[:], local[:len(msg.To)])
+}
+
+/////////////////////////////////////////////////////////////////////
+// SECTION: Encryption
+/////////////////////////////////////////////////////////////////////
+
+// Links a peer ECDSA public key to a topic
+//
+// This is required for asymmetric message exchange
+// on the given topic
+//
+// The value in `address` will be used as a routing hint for the
+// public key / topic association
+func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address *PssAddress) error {
+ pubkeybytes := crypto.FromECDSAPub(pubkey)
+ if len(pubkeybytes) == 0 {
+ return fmt.Errorf("invalid public key: %v", pubkey)
+ }
+ pubkeyid := common.ToHex(pubkeybytes)
+ psp := &pssPeer{
+ address: address,
+ }
+ p.pubKeyPoolMu.Lock()
+ if _, ok := p.pubKeyPool[pubkeyid]; !ok {
+ p.pubKeyPool[pubkeyid] = make(map[Topic]*pssPeer)
+ }
+ p.pubKeyPool[pubkeyid][topic] = psp
+ p.pubKeyPoolMu.Unlock()
+ log.Trace("added pubkey", "pubkeyid", pubkeyid, "topic", topic, "address", common.ToHex(*address))
+ return nil
+}
+
+// Automatically generate a new symkey for a topic and address hint
+func (p *Pss) generateSymmetricKey(topic Topic, address *PssAddress, addToCache bool) (string, error) {
+ keyid, err := p.w.GenerateSymKey()
+ if err != nil {
+ return "", err
+ }
+ p.addSymmetricKeyToPool(keyid, topic, address, addToCache, false)
+ return keyid, nil
+}
+
+// Links a peer symmetric key (arbitrary byte sequence) to a topic
+//
+// This is required for symmetrically encrypted message exchange
+// on the given topic
+//
+// The key is stored in the whisper backend.
+//
+// If addtocache is set to true, the key will be added to the cache of keys
+// used to attempt symmetric decryption of incoming messages.
+//
+// Returns a string id that can be used to retrieve the key bytes
+// from the whisper backend (see pss.GetSymmetricKey())
+func (p *Pss) SetSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool) (string, error) {
+ return p.setSymmetricKey(key, topic, address, addtocache, true)
+}
+
+func (p *Pss) setSymmetricKey(key []byte, topic Topic, address *PssAddress, addtocache bool, protected bool) (string, error) {
+ keyid, err := p.w.AddSymKeyDirect(key)
+ if err != nil {
+ return "", err
+ }
+ p.addSymmetricKeyToPool(keyid, topic, address, addtocache, protected)
+ return keyid, nil
+}
+
+// adds a symmetric key to the pss key pool, and optionally adds the key
+// to the collection of keys used to attempt symmetric decryption of
+// incoming messages
+func (p *Pss) addSymmetricKeyToPool(keyid string, topic Topic, address *PssAddress, addtocache bool, protected bool) {
+ psp := &pssPeer{
+ address: address,
+ protected: protected,
+ }
+ p.symKeyPoolMu.Lock()
+ if _, ok := p.symKeyPool[keyid]; !ok {
+ p.symKeyPool[keyid] = make(map[Topic]*pssPeer)
+ }
+ p.symKeyPool[keyid][topic] = psp
+ p.symKeyPoolMu.Unlock()
+ if addtocache {
+ p.symKeyDecryptCacheCursor++
+ p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = &keyid
+ }
+ key, _ := p.GetSymmetricKey(keyid)
+ log.Trace("added symkey", "symkeyid", keyid, "symkey", common.ToHex(key), "topic", topic, "address", fmt.Sprintf("%p", address), "cache", addtocache)
+}
+
+// Returns a symmetric key byte seqyence stored in the whisper backend
+// by its unique id
+//
+// Passes on the error value from the whisper backend
+func (p *Pss) GetSymmetricKey(symkeyid string) ([]byte, error) {
+ symkey, err := p.w.GetSymKey(symkeyid)
+ if err != nil {
+ return nil, err
+ }
+ return symkey, nil
+}
+
+// Returns all recorded topic and address combination for a specific public key
+func (p *Pss) GetPublickeyPeers(keyid string) (topic []Topic, address []PssAddress, err error) {
+ p.pubKeyPoolMu.RLock()
+ defer p.pubKeyPoolMu.RUnlock()
+ for t, peer := range p.pubKeyPool[keyid] {
+ topic = append(topic, t)
+ address = append(address, *peer.address)
+ }
+
+ return topic, address, nil
+}
+
+func (p *Pss) getPeerAddress(keyid string, topic Topic) (PssAddress, error) {
+ p.pubKeyPoolMu.RLock()
+ defer p.pubKeyPoolMu.RUnlock()
+ if peers, ok := p.pubKeyPool[keyid]; ok {
+ if t, ok := peers[topic]; ok {
+ return *t.address, nil
+ }
+ }
+ return nil, fmt.Errorf("peer with pubkey %s, topic %x not found", keyid, topic)
+}
+
+// Attempt to decrypt, validate and unpack a
+// symmetrically encrypted message
+// If successful, returns the unpacked whisper ReceivedMessage struct
+// encapsulating the decrypted message, and the whisper backend id
+// of the symmetric key used to decrypt the message.
+// It fails if decryption of the message fails or if the message is corrupted
+func (p *Pss) processSym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) {
+ metrics.GetOrRegisterCounter("pss.process.sym", nil).Inc(1)
+
+ for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
+ symkeyid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
+ symkey, err := p.w.GetSymKey(*symkeyid)
+ if err != nil {
+ continue
+ }
+ recvmsg, err := envelope.OpenSymmetric(symkey)
+ if err != nil {
+ continue
+ }
+ if !recvmsg.Validate() {
+ return nil, "", nil, fmt.Errorf("symmetrically encrypted message has invalid signature or is corrupt")
+ }
+ p.symKeyPoolMu.Lock()
+ from := p.symKeyPool[*symkeyid][Topic(envelope.Topic)].address
+ p.symKeyPoolMu.Unlock()
+ p.symKeyDecryptCacheCursor++
+ p.symKeyDecryptCache[p.symKeyDecryptCacheCursor%cap(p.symKeyDecryptCache)] = symkeyid
+ return recvmsg, *symkeyid, from, nil
+ }
+ return nil, "", nil, fmt.Errorf("could not decrypt message")
+}
+
+// Attempt to decrypt, validate and unpack an
+// asymmetrically encrypted message
+// If successful, returns the unpacked whisper ReceivedMessage struct
+// encapsulating the decrypted message, and the byte representation of
+// the public key used to decrypt the message.
+// It fails if decryption of message fails, or if the message is corrupted
+func (p *Pss) processAsym(envelope *whisper.Envelope) (*whisper.ReceivedMessage, string, *PssAddress, error) {
+ metrics.GetOrRegisterCounter("pss.process.asym", nil).Inc(1)
+
+ recvmsg, err := envelope.OpenAsymmetric(p.privateKey)
+ if err != nil {
+ return nil, "", nil, fmt.Errorf("could not decrypt message: %s", err)
+ }
+ // check signature (if signed), strip padding
+ if !recvmsg.Validate() {
+ return nil, "", nil, fmt.Errorf("invalid message")
+ }
+ pubkeyid := common.ToHex(crypto.FromECDSAPub(recvmsg.Src))
+ var from *PssAddress
+ p.pubKeyPoolMu.Lock()
+ if p.pubKeyPool[pubkeyid][Topic(envelope.Topic)] != nil {
+ from = p.pubKeyPool[pubkeyid][Topic(envelope.Topic)].address
+ }
+ p.pubKeyPoolMu.Unlock()
+ return recvmsg, pubkeyid, from, nil
+}
+
+// Symkey garbage collection
+// a key is removed if:
+// - it is not marked as protected
+// - it is not in the incoming decryption cache
+func (p *Pss) cleanKeys() (count int) {
+ for keyid, peertopics := range p.symKeyPool {
+ var expiredtopics []Topic
+ for topic, psp := range peertopics {
+ if psp.protected {
+ continue
+ }
+
+ var match bool
+ for i := p.symKeyDecryptCacheCursor; i > p.symKeyDecryptCacheCursor-cap(p.symKeyDecryptCache) && i > 0; i-- {
+ cacheid := p.symKeyDecryptCache[i%cap(p.symKeyDecryptCache)]
+ if *cacheid == keyid {
+ match = true
+ }
+ }
+ if !match {
+ expiredtopics = append(expiredtopics, topic)
+ }
+ }
+ for _, topic := range expiredtopics {
+ p.symKeyPoolMu.Lock()
+ delete(p.symKeyPool[keyid], topic)
+ log.Trace("symkey cleanup deletion", "symkeyid", keyid, "topic", topic, "val", p.symKeyPool[keyid])
+ p.symKeyPoolMu.Unlock()
+ count++
+ }
+ }
+ return
+}
+
+/////////////////////////////////////////////////////////////////////
+// SECTION: Message sending
+/////////////////////////////////////////////////////////////////////
+
+func (p *Pss) enqueue(msg *PssMsg) error {
+ select {
+ case p.outbox <- msg:
+ return nil
+ default:
+ }
+
+ metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
+ return errors.New("outbox full")
+}
+
+// Send a raw message (any encryption is responsibility of calling client)
+//
+// Will fail if raw messages are disallowed
+func (p *Pss) SendRaw(address PssAddress, topic Topic, msg []byte) error {
+ if !p.allowRaw {
+ return errors.New("Raw messages not enabled")
+ }
+ pssMsgParams := &msgParams{
+ raw: true,
+ }
+ payload := &whisper.Envelope{
+ Data: msg,
+ Topic: whisper.TopicType(topic),
+ }
+ pssMsg := newPssMsg(pssMsgParams)
+ pssMsg.To = address
+ pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
+ pssMsg.Payload = payload
+ p.addFwdCache(pssMsg)
+ return p.enqueue(pssMsg)
+}
+
+// Send a message using symmetric encryption
+//
+// Fails if the key id does not match any of the stored symmetric keys
+func (p *Pss) SendSym(symkeyid string, topic Topic, msg []byte) error {
+ symkey, err := p.GetSymmetricKey(symkeyid)
+ if err != nil {
+ return fmt.Errorf("missing valid send symkey %s: %v", symkeyid, err)
+ }
+ p.symKeyPoolMu.Lock()
+ psp, ok := p.symKeyPool[symkeyid][topic]
+ p.symKeyPoolMu.Unlock()
+ if !ok {
+ return fmt.Errorf("invalid topic '%s' for symkey '%s'", topic.String(), symkeyid)
+ } else if psp.address == nil {
+ return fmt.Errorf("no address hint for topic '%s' symkey '%s'", topic.String(), symkeyid)
+ }
+ err = p.send(*psp.address, topic, msg, false, symkey)
+ return err
+}
+
+// Send a message using asymmetric encryption
+//
+// Fails if the key id does not match any in of the stored public keys
+func (p *Pss) SendAsym(pubkeyid string, topic Topic, msg []byte) error {
+ if _, err := crypto.UnmarshalPubkey(common.FromHex(pubkeyid)); err != nil {
+ return fmt.Errorf("Cannot unmarshal pubkey: %x", pubkeyid)
+ }
+ p.pubKeyPoolMu.Lock()
+ psp, ok := p.pubKeyPool[pubkeyid][topic]
+ p.pubKeyPoolMu.Unlock()
+ if !ok {
+ return fmt.Errorf("invalid topic '%s' for pubkey '%s'", topic.String(), pubkeyid)
+ } else if psp.address == nil {
+ return fmt.Errorf("no address hint for topic '%s' pubkey '%s'", topic.String(), pubkeyid)
+ }
+ go func() {
+ p.send(*psp.address, topic, msg, true, common.FromHex(pubkeyid))
+ }()
+ return nil
+}
+
+// Send is payload agnostic, and will accept any byte slice as payload
+// It generates an whisper envelope for the specified recipient and topic,
+// and wraps the message payload in it.
+// TODO: Implement proper message padding
+func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []byte) error {
+ metrics.GetOrRegisterCounter("pss.send", nil).Inc(1)
+
+ if key == nil || bytes.Equal(key, []byte{}) {
+ return fmt.Errorf("Zero length key passed to pss send")
+ }
+ padding := make([]byte, p.paddingByteSize)
+ c, err := rand.Read(padding)
+ if err != nil {
+ return err
+ } else if c < p.paddingByteSize {
+ return fmt.Errorf("invalid padding length: %d", c)
+ }
+ wparams := &whisper.MessageParams{
+ TTL: defaultWhisperTTL,
+ Src: p.privateKey,
+ Topic: whisper.TopicType(topic),
+ WorkTime: defaultWhisperWorkTime,
+ PoW: defaultWhisperPoW,
+ Payload: msg,
+ Padding: padding,
+ }
+ if asymmetric {
+ pk, err := crypto.UnmarshalPubkey(key)
+ if err != nil {
+ return fmt.Errorf("Cannot unmarshal pubkey: %x", key)
+ }
+ wparams.Dst = pk
+ } else {
+ wparams.KeySym = key
+ }
+ // set up outgoing message container, which does encryption and envelope wrapping
+ woutmsg, err := whisper.NewSentMessage(wparams)
+ if err != nil {
+ return fmt.Errorf("failed to generate whisper message encapsulation: %v", err)
+ }
+ // performs encryption.
+ // Does NOT perform / performs negligible PoW due to very low difficulty setting
+ // after this the message is ready for sending
+ envelope, err := woutmsg.Wrap(wparams)
+ if err != nil {
+ return fmt.Errorf("failed to perform whisper encryption: %v", err)
+ }
+ log.Trace("pssmsg whisper done", "env", envelope, "wparams payload", common.ToHex(wparams.Payload), "to", common.ToHex(to), "asym", asymmetric, "key", common.ToHex(key))
+
+ // prepare for devp2p transport
+ pssMsgParams := &msgParams{
+ sym: !asymmetric,
+ }
+ pssMsg := newPssMsg(pssMsgParams)
+ pssMsg.To = to
+ pssMsg.Expire = uint32(time.Now().Add(p.msgTTL).Unix())
+ pssMsg.Payload = envelope
+ return p.enqueue(pssMsg)
+}
+
+// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
+// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
+// of the peer address of the equivalent length.
+func (p *Pss) forward(msg *PssMsg) error {
+ metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)
+
+ to := make([]byte, addressLength)
+ copy(to[:len(msg.To)], msg.To)
+
+ // send with kademlia
+ // find the closest peer to the recipient and attempt to send
+ sent := 0
+ p.Overlay.EachConn(to, 256, func(op network.OverlayConn, po int, isproxbin bool) bool {
+ // we need p2p.protocols.Peer.Send
+ // cast and resolve
+ sp, ok := op.(senderPeer)
+ if !ok {
+ log.Crit("Pss cannot use kademlia peer type")
+ return false
+ }
+ info := sp.Info()
+
+ // check if the peer is running pss
+ var ispss bool
+ for _, cap := range info.Caps {
+ if cap == p.capstring {
+ ispss = true
+ break
+ }
+ }
+ if !ispss {
+ log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
+ return true
+ }
+
+ // get the protocol peer from the forwarding peer cache
+ sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), op.Address())
+ p.fwdPoolMu.RLock()
+ pp := p.fwdPool[sp.Info().ID]
+ p.fwdPoolMu.RUnlock()
+
+ // attempt to send the message
+ err := pp.Send(msg)
+ if err != nil {
+ metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
+ log.Error(err.Error())
+ return true
+ }
+ sent++
+ log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))
+
+ // continue forwarding if:
+ // - if the peer is end recipient but the full address has not been disclosed
+ // - if the peer address matches the partial address fully
+ // - if the peer is in proxbin
+ if len(msg.To) < addressLength && bytes.Equal(msg.To, op.Address()[:len(msg.To)]) {
+ log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
+ return true
+ } else if isproxbin {
+ log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(op.Address())))
+ return true
+ }
+ // at this point we stop forwarding, and the state is as follows:
+ // - the peer is end recipient and we have full address
+ // - we are not in proxbin (directed routing)
+ // - partial addresses don't fully match
+ return false
+ })
+
+ if sent == 0 {
+ log.Debug("unable to forward to any peers")
+ if err := p.enqueue(msg); err != nil {
+ metrics.GetOrRegisterCounter("pss.forward.enqueue.error", nil).Inc(1)
+ log.Error(err.Error())
+ return err
+ }
+ }
+
+ // cache the message
+ p.addFwdCache(msg)
+ return nil
+}
+
+/////////////////////////////////////////////////////////////////////
+// SECTION: Caching
+/////////////////////////////////////////////////////////////////////
+
+// cleanFwdCache is used to periodically remove expired entries from the forward cache
+func (p *Pss) cleanFwdCache() {
+ metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1)
+ p.fwdCacheMu.Lock()
+ defer p.fwdCacheMu.Unlock()
+ for k, v := range p.fwdCache {
+ if v.expiresAt.Before(time.Now()) {
+ delete(p.fwdCache, k)
+ }
+ }
+}
+
+// add a message to the cache
+func (p *Pss) addFwdCache(msg *PssMsg) error {
+ metrics.GetOrRegisterCounter("pss.addfwdcache", nil).Inc(1)
+
+ var entry pssCacheEntry
+ var ok bool
+
+ p.fwdCacheMu.Lock()
+ defer p.fwdCacheMu.Unlock()
+
+ digest := p.digest(msg)
+ if entry, ok = p.fwdCache[digest]; !ok {
+ entry = pssCacheEntry{}
+ }
+ entry.expiresAt = time.Now().Add(p.cacheTTL)
+ p.fwdCache[digest] = entry
+ return nil
+}
+
+// check if message is in the cache
+func (p *Pss) checkFwdCache(msg *PssMsg) bool {
+ p.fwdCacheMu.Lock()
+ defer p.fwdCacheMu.Unlock()
+
+ digest := p.digest(msg)
+ entry, ok := p.fwdCache[digest]
+ if ok {
+ if entry.expiresAt.After(time.Now()) {
+ log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
+ metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
+ return true
+ }
+ metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1)
+ }
+ return false
+}
+
+// Digest of message
+func (p *Pss) digest(msg *PssMsg) pssDigest {
+ hasher := p.hashPool.Get().(storage.SwarmHash)
+ defer p.hashPool.Put(hasher)
+ hasher.Reset()
+ hasher.Write(msg.serialize())
+ digest := pssDigest{}
+ key := hasher.Sum(nil)
+ copy(digest[:], key[:digestLength])
+ return digest
+}