aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv5/whisper.go
diff options
context:
space:
mode:
authorgluk256 <gluk256@users.noreply.github.com>2017-04-10 05:49:22 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-04-10 05:49:22 +0800
commit9cd713551627a9b48e04a77f64a15ea6f829dcf4 (patch)
treef1849e61cd8269dfe9c70861f49216cf78acec3b /whisper/whisperv5/whisper.go
parent8570ef19eb8dfe4e2a450525c589dec291f3a517 (diff)
downloaddexon-9cd713551627a9b48e04a77f64a15ea6f829dcf4.tar.gz
dexon-9cd713551627a9b48e04a77f64a15ea6f829dcf4.tar.zst
dexon-9cd713551627a9b48e04a77f64a15ea6f829dcf4.zip
whisper: big refactoring (#13852)
* whisper: GetMessages fixed; size restriction updated * whisper: made PoW and MaxMsgSize customizable * whisper: test added * whisper: sym key management changed * whisper: identity management refactored * whisper: API refactoring (Post and Filter) * whisper: big refactoring complete * whisper: spelling fix * whisper: variable topic size allowed for a filter * whisper: final update * whisper: formatting * whisper: file exchange introduced in wnode * whisper: bugfix * whisper: API updated + new tests * whisper: statistics updated * whisper: wnode server updated * whisper: allowed filtering for variable topic size * whisper: tests added * whisper: resolving merge conflicts * whisper: refactoring (documenting mostly) * whsiper: tests fixed * whisper: down cased error messages * whisper: documenting the API functions * whisper: logging fixed * whisper: fixed wnode parameters * whisper: logs fixed (typos)
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r--whisper/whisperv5/whisper.go373
1 files changed, 258 insertions, 115 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index 5062f7b6b..c4d5d04a7 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -31,59 +31,62 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
set "gopkg.in/fatih/set.v0"
)
type Statistics struct {
- messagesCleared int
- memoryCleared int
- totalMemoryUsed int
+ messagesCleared int
+ memoryCleared int
+ memoryUsed int
+ cycles int
+ totalMessagesCleared int
}
// Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Whisper struct {
- protocol p2p.Protocol
- filters *Filters
+ protocol p2p.Protocol // Protocol description and parameters
+ filters *Filters // Message filters installed with Subscribe function
- privateKeys map[string]*ecdsa.PrivateKey
- symKeys map[string][]byte
- keyMu sync.RWMutex
+ privateKeys map[string]*ecdsa.PrivateKey // Private key storage
+ symKeys map[string][]byte // Symmetric key storage
+ keyMu sync.RWMutex // Mutex associated with key storages
- envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
- messages map[common.Hash]*ReceivedMessage // Pool of successfully decrypted messages, which are not expired yet
- expirations map[uint32]*set.SetNonTS // Message expiration pool
- poolMu sync.RWMutex // Mutex to sync the message and expiration pools
+ envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
+ expirations map[uint32]*set.SetNonTS // Message expiration pool
+ poolMu sync.RWMutex // Mutex to sync the message and expiration pools
peers map[*Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set
- mailServer MailServer
+ messageQueue chan *Envelope // Message queue for normal whisper messages
+ p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
+ quit chan struct{} // Channel used for graceful exit
- messageQueue chan *Envelope
- p2pMsgQueue chan *Envelope
- quit chan struct{}
+ minPoW float64 // Minimal PoW required by the whisper node
+ maxMsgLength int // Maximal message length allowed by the whisper node
+ overflow bool // Indicator of message queue overflow
- stats Statistics
+ stats Statistics // Statistics of whisper node
- overflow bool
- test bool
+ mailServer MailServer // MailServer interface
}
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
-// Param s should be passed if you want to implement mail server, otherwise nil.
func New() *Whisper {
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
- messages: make(map[common.Hash]*ReceivedMessage),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
+ minPoW: DefaultMinimumPoW,
+ maxMsgLength: DefaultMaxMessageLength,
}
whisper.filters = NewFilters(whisper)
@@ -110,6 +113,8 @@ func (w *Whisper) APIs() []rpc.API {
}
}
+// RegisterServer registers MailServer interface.
+// MailServer will process all the incoming messages with p2pRequestCode.
func (w *Whisper) RegisterServer(server MailServer) {
w.mailServer = server
}
@@ -124,6 +129,25 @@ func (w *Whisper) Version() uint {
return w.protocol.Version
}
+// SetMaxMessageLength sets the maximal message length allowed by this node
+func (w *Whisper) SetMaxMessageLength(val int) error {
+ if val <= 0 {
+ return fmt.Errorf("invalid message length: %d", val)
+ }
+ w.maxMsgLength = val
+ return nil
+}
+
+// SetMinimumPoW sets the minimal PoW required by this node
+func (w *Whisper) SetMinimumPoW(val float64) error {
+ if val <= 0.0 {
+ return fmt.Errorf("invalid PoW: %f", val)
+ }
+ w.minPoW = val
+ return nil
+}
+
+// getPeer retrieves peer by ID
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
w.peerMu.Lock()
defer w.peerMu.Unlock()
@@ -136,9 +160,9 @@ func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
return nil, fmt.Errorf("Could not find peer with ID: %x", peerID)
}
-// MarkPeerTrusted marks specific peer trusted, which will allow it
-// to send historic (expired) messages.
-func (w *Whisper) MarkPeerTrusted(peerID []byte) error {
+// AllowP2PMessagesFromPeer marks specific peer trusted,
+// which will allow it to send historic (expired) messages.
+func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
p, err := w.getPeer(peerID)
if err != nil {
return err
@@ -147,6 +171,11 @@ func (w *Whisper) MarkPeerTrusted(peerID []byte) error {
return nil
}
+// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer,
+// which is known to implement MailServer interface, and is supposed to process this
+// request and respond with a number of peer-to-peer messages (possibly expired),
+// which are not supposed to be forwarded any further.
+// The whisper protocol is agnostic of the format and contents of envelope.
func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
p, err := w.getPeer(peerID)
if err != nil {
@@ -156,153 +185,226 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) err
return p2p.Send(p.ws, p2pRequestCode, envelope)
}
+// SendP2PMessage sends a peer-to-peer message to a specific peer.
func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
p, err := w.getPeer(peerID)
if err != nil {
return err
}
- return p2p.Send(p.ws, p2pCode, envelope)
+ return w.SendP2PDirect(p, envelope)
}
+// SendP2PDirect sends a peer-to-peer message to a specific peer.
func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
return p2p.Send(peer.ws, p2pCode, envelope)
}
-// NewIdentity generates a new cryptographic identity for the client, and injects
-// it into the known identities for message decryption.
-func (w *Whisper) NewIdentity() *ecdsa.PrivateKey {
+// NewKeyPair generates a new cryptographic identity for the client, and injects
+// it into the known identities for message decryption. Returns ID of the new key pair.
+func (w *Whisper) NewKeyPair() (string, error) {
key, err := crypto.GenerateKey()
if err != nil || !validatePrivateKey(key) {
key, err = crypto.GenerateKey() // retry once
}
if err != nil {
- panic(err)
+ return "", err
}
if !validatePrivateKey(key) {
- panic("Failed to generate valid key")
+ return "", fmt.Errorf("failed to generate valid key")
}
+
+ id, err := GenerateRandomID()
+ if err != nil {
+ return "", fmt.Errorf("failed to generate ID: %s", err)
+ }
+
w.keyMu.Lock()
defer w.keyMu.Unlock()
- w.privateKeys[common.ToHex(crypto.FromECDSAPub(&key.PublicKey))] = key
- return key
+
+ if w.privateKeys[id] != nil {
+ return "", fmt.Errorf("failed to generate unique ID")
+ }
+ w.privateKeys[id] = key
+ return id, nil
}
-// DeleteIdentity deletes the specified key if it exists.
-func (w *Whisper) DeleteIdentity(key string) {
+// DeleteKeyPair deletes the specified key if it exists.
+func (w *Whisper) DeleteKeyPair(key string) bool {
w.keyMu.Lock()
defer w.keyMu.Unlock()
- delete(w.privateKeys, key)
+
+ if w.privateKeys[key] != nil {
+ delete(w.privateKeys, key)
+ return true
+ }
+ return false
}
-// HasIdentity checks if the the whisper node is configured with the private key
+// HasKeyPair checks if the the whisper node is configured with the private key
// of the specified public pair.
-func (w *Whisper) HasIdentity(pubKey string) bool {
+func (w *Whisper) HasKeyPair(id string) bool {
w.keyMu.RLock()
defer w.keyMu.RUnlock()
- return w.privateKeys[pubKey] != nil
+ return w.privateKeys[id] != nil
}
-// GetIdentity retrieves the private key of the specified public identity.
-func (w *Whisper) GetIdentity(pubKey string) *ecdsa.PrivateKey {
+// GetPrivateKey retrieves the private key of the specified identity.
+func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
w.keyMu.RLock()
defer w.keyMu.RUnlock()
- return w.privateKeys[pubKey]
+ key := w.privateKeys[id]
+ if key == nil {
+ return nil, fmt.Errorf("invalid id")
+ }
+ return key, nil
}
-func (w *Whisper) GenerateSymKey(name string) error {
+// GenerateSymKey generates a random symmetric key and stores it under id,
+// which is then returned. Will be used in the future for session key exchange.
+func (w *Whisper) GenerateSymKey() (string, error) {
const size = aesKeyLength * 2
buf := make([]byte, size)
_, err := crand.Read(buf)
if err != nil {
- return err
+ return "", err
} else if !validateSymmetricKey(buf) {
- return fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data")
+ return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data")
}
key := buf[:aesKeyLength]
salt := buf[aesKeyLength:]
derived, err := DeriveOneTimeKey(key, salt, EnvelopeVersion)
if err != nil {
- return err
+ return "", err
} else if !validateSymmetricKey(derived) {
- return fmt.Errorf("failed to derive valid key")
+ return "", fmt.Errorf("failed to derive valid key")
+ }
+
+ id, err := GenerateRandomID()
+ if err != nil {
+ return "", fmt.Errorf("failed to generate ID: %s", err)
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
- if w.symKeys[name] != nil {
- return fmt.Errorf("Key with name [%s] already exists", name)
+ if w.symKeys[id] != nil {
+ return "", fmt.Errorf("failed to generate unique ID")
}
- w.symKeys[name] = derived
- return nil
+ w.symKeys[id] = derived
+ return id, nil
}
-func (w *Whisper) AddSymKey(name string, key []byte) error {
- if w.HasSymKey(name) {
- return fmt.Errorf("Key with name [%s] already exists", name)
+// AddSymKeyDirect stores the key, and returns its id.
+func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) {
+ if len(key) != aesKeyLength {
+ return "", fmt.Errorf("wrong key size: %d", len(key))
}
- derived, err := deriveKeyMaterial(key, EnvelopeVersion)
+ id, err := GenerateRandomID()
if err != nil {
- return err
+ return "", fmt.Errorf("failed to generate ID: %s", err)
}
w.keyMu.Lock()
defer w.keyMu.Unlock()
- // double check is necessary, because deriveKeyMaterial() is slow
- if w.symKeys[name] != nil {
- return fmt.Errorf("Key with name [%s] already exists", name)
+ if w.symKeys[id] != nil {
+ return "", fmt.Errorf("failed to generate unique ID")
}
- w.symKeys[name] = derived
- return nil
+ w.symKeys[id] = key
+ return id, nil
}
-func (w *Whisper) HasSymKey(name string) bool {
+// AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
+func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) {
+ id, err := GenerateRandomID()
+ if err != nil {
+ return "", fmt.Errorf("failed to generate ID: %s", err)
+ }
+ if w.HasSymKey(id) {
+ return "", fmt.Errorf("failed to generate unique ID")
+ }
+
+ derived, err := deriveKeyMaterial([]byte(password), EnvelopeVersion)
+ if err != nil {
+ return "", err
+ }
+
+ w.keyMu.Lock()
+ defer w.keyMu.Unlock()
+
+ // double check is necessary, because deriveKeyMaterial() is very slow
+ if w.symKeys[id] != nil {
+ return "", fmt.Errorf("critical error: failed to generate unique ID")
+ }
+ w.symKeys[id] = derived
+ return id, nil
+}
+
+// HasSymKey returns true if there is a key associated with the given id.
+// Otherwise returns false.
+func (w *Whisper) HasSymKey(id string) bool {
w.keyMu.RLock()
defer w.keyMu.RUnlock()
- return w.symKeys[name] != nil
+ return w.symKeys[id] != nil
}
-func (w *Whisper) DeleteSymKey(name string) {
+// DeleteSymKey deletes the key associated with the name string if it exists.
+func (w *Whisper) DeleteSymKey(id string) bool {
w.keyMu.Lock()
defer w.keyMu.Unlock()
- delete(w.symKeys, name)
+ if w.symKeys[id] != nil {
+ delete(w.symKeys, id)
+ return true
+ }
+ return false
}
-func (w *Whisper) GetSymKey(name string) []byte {
+// GetSymKey returns the symmetric key associated with the given id.
+func (w *Whisper) GetSymKey(id string) ([]byte, error) {
w.keyMu.RLock()
defer w.keyMu.RUnlock()
- return w.symKeys[name]
+ if w.symKeys[id] != nil {
+ return w.symKeys[id], nil
+ }
+ return nil, fmt.Errorf("non-existent key ID")
}
-// Watch installs a new message handler to run in case a matching packet arrives
-// from the whisper network.
-func (w *Whisper) Watch(f *Filter) (string, error) {
+// Subscribe installs a new message handler used for filtering, decrypting
+// and subsequent storing of incoming messages.
+func (w *Whisper) Subscribe(f *Filter) (string, error) {
return w.filters.Install(f)
}
+// GetFilter returns the filter by id.
func (w *Whisper) GetFilter(id string) *Filter {
return w.filters.Get(id)
}
-// Unwatch removes an installed message handler.
-func (w *Whisper) Unwatch(id string) {
- w.filters.Uninstall(id)
+// Unsubscribe removes an installed message handler.
+func (w *Whisper) Unsubscribe(id string) error {
+ ok := w.filters.Uninstall(id)
+ if !ok {
+ return fmt.Errorf("Unsubscribe: Invalid ID")
+ }
+ return nil
}
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func (w *Whisper) Send(envelope *Envelope) error {
- _, err := w.add(envelope)
+ ok, err := w.add(envelope)
+ if !ok {
+ return fmt.Errorf("failed to add envelope")
+ }
return err
}
// Start implements node.Service, starting the background data propagation thread
// of the Whisper protocol.
func (w *Whisper) Start(*p2p.Server) error {
- log.Info(fmt.Sprint("Whisper started"))
+ log.Info("started whisper v." + ProtocolVersionStr)
go w.update()
numCPU := runtime.NumCPU()
@@ -317,11 +419,11 @@ func (w *Whisper) Start(*p2p.Server) error {
// of the Whisper protocol.
func (w *Whisper) Stop() error {
close(w.quit)
- log.Info(fmt.Sprint("Whisper stopped"))
+ log.Info("whisper stopped")
return nil
}
-// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
+// HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
// connection is negotiated.
func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
// Create the new peer and start tracking it
@@ -353,26 +455,31 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// fetch the next packet
packet, err := rw.ReadMsg()
if err != nil {
+ log.Warn("message loop", "peer", p.peer.ID(), "err", err)
return err
}
+ if packet.Size > uint32(wh.maxMsgLength) {
+ log.Warn("oversized message received", "peer", p.peer.ID())
+ return errors.New("oversized message received")
+ }
switch packet.Code {
case statusCode:
// this should not happen, but no need to panic; just ignore this message.
- log.Warn(fmt.Sprintf("%v: unxepected status message received", p.peer))
+ log.Warn("unxepected status message received", "peer", p.peer.ID())
case messagesCode:
// decode the contained envelopes
var envelopes []*Envelope
if err := packet.Decode(&envelopes); err != nil {
- log.Warn(fmt.Sprintf("%v: failed to decode envelope: [%v], peer will be disconnected", p.peer, err))
- return fmt.Errorf("garbage received")
+ log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid envelope")
}
// inject all envelopes into the internal pool
for _, envelope := range envelopes {
cached, err := wh.add(envelope)
if err != nil {
- log.Warn(fmt.Sprintf("%v: bad envelope received: [%v], peer will be disconnected", p.peer, err))
- return fmt.Errorf("invalid envelope")
+ log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid envelope")
}
if cached {
p.mark(envelope)
@@ -386,8 +493,8 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if p.trusted {
var envelope Envelope
if err := packet.Decode(&envelope); err != nil {
- log.Warn(fmt.Sprintf("%v: failed to decode direct message: [%v], peer will be disconnected", p.peer, err))
- return fmt.Errorf("garbage received (directMessage)")
+ log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid direct message")
}
wh.postEvent(&envelope, true)
}
@@ -396,8 +503,8 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if wh.mailServer != nil {
var request Envelope
if err := packet.Decode(&request); err != nil {
- log.Warn(fmt.Sprintf("%v: failed to decode p2p request message: [%v], peer will be disconnected", p.peer, err))
- return fmt.Errorf("garbage received (p2p request)")
+ log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid p2p request")
}
wh.mailServer.DeliverMail(p, &request)
}
@@ -430,12 +537,12 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
if envelope.Expiry+SynchAllowance*2 < now {
return false, fmt.Errorf("very old message")
} else {
- log.Debug(fmt.Sprintf("expired envelope dropped [%x]", envelope.Hash()))
+ log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error
}
}
- if len(envelope.Data) > MaxMessageLength {
+ if envelope.size() > wh.maxMsgLength {
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
@@ -453,8 +560,8 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return false, fmt.Errorf("oversized salt [%x]", envelope.Hash())
}
- if envelope.PoW() < MinimumPoW && !wh.test {
- log.Debug(fmt.Sprintf("envelope with low PoW dropped: %f [%x]", envelope.PoW(), envelope.Hash()))
+ if envelope.PoW() < wh.minPoW {
+ log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error
}
@@ -474,10 +581,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
wh.poolMu.Unlock()
if alreadyCached {
- log.Trace(fmt.Sprintf("whisper envelope already cached [%x]\n", envelope.Hash()))
+ log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
} else {
- log.Trace(fmt.Sprintf("cached whisper envelope [%x]: %v\n", envelope.Hash(), envelope))
- wh.stats.totalMemoryUsed += envelope.size()
+ log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
+ wh.stats.memoryUsed += envelope.size()
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
@@ -508,11 +615,12 @@ func (w *Whisper) checkOverflow() {
if queueSize == messageQueueLimit {
if !w.overflow {
w.overflow = true
- log.Warn(fmt.Sprint("message queue overflow"))
+ log.Warn("message queue overflow")
}
} else if queueSize <= messageQueueLimit/2 {
if w.overflow {
w.overflow = false
+ log.Warn("message queue overflow fixed (back to normal)")
}
}
}
@@ -558,19 +666,17 @@ func (w *Whisper) expire() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
- w.stats.clear()
+ w.stats.reset()
now := uint32(time.Now().Unix())
for expiry, hashSet := range w.expirations {
if expiry < now {
- w.stats.messagesCleared++
-
// Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool {
sz := w.envelopes[v.(common.Hash)].size()
- w.stats.memoryCleared += sz
- w.stats.totalMemoryUsed -= sz
delete(w.envelopes, v.(common.Hash))
- delete(w.messages, v.(common.Hash))
+ w.stats.messagesCleared++
+ w.stats.memoryCleared += sz
+ w.stats.memoryUsed -= sz
return true
})
w.expirations[expiry].Clear()
@@ -579,12 +685,21 @@ func (w *Whisper) expire() {
}
}
+// Stats returns the whisper node statistics.
func (w *Whisper) Stats() string {
- return fmt.Sprintf("Latest expiry cycle cleared %d messages (%d bytes). Memory usage: %d bytes.",
- w.stats.messagesCleared, w.stats.memoryCleared, w.stats.totalMemoryUsed)
+ result := fmt.Sprintf("Memory usage: %d bytes. Average messages cleared per expiry cycle: %d. Total messages cleared: %d.",
+ w.stats.memoryUsed, w.stats.totalMessagesCleared/w.stats.cycles, w.stats.totalMessagesCleared)
+ if w.stats.messagesCleared > 0 {
+ result += fmt.Sprintf(" Latest expiry cycle cleared %d messages (%d bytes).",
+ w.stats.messagesCleared, w.stats.memoryCleared)
+ }
+ if w.overflow {
+ result += " Message queue state: overflow."
+ }
+ return result
}
-// envelopes retrieves all the messages currently pooled by the node.
+// Envelopes retrieves all the messages currently pooled by the node.
func (w *Whisper) Envelopes() []*Envelope {
w.poolMu.RLock()
defer w.poolMu.RUnlock()
@@ -596,15 +711,17 @@ func (w *Whisper) Envelopes() []*Envelope {
return all
}
-// Messages retrieves all the decrypted messages matching a filter id.
+// Messages iterates through all currently floating envelopes
+// and retrieves all the messages, that this filter could decrypt.
func (w *Whisper) Messages(id string) []*ReceivedMessage {
result := make([]*ReceivedMessage, 0)
w.poolMu.RLock()
defer w.poolMu.RUnlock()
if filter := w.filters.Get(id); filter != nil {
- for _, msg := range w.messages {
- if filter.MatchMessage(msg) {
+ for _, env := range w.envelopes {
+ msg := filter.processEnvelope(env)
+ if msg != nil {
result = append(result, msg)
}
}
@@ -612,6 +729,7 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage {
return result
}
+// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
func (w *Whisper) isEnvelopeCached(hash common.Hash) bool {
w.poolMu.Lock()
defer w.poolMu.Unlock()
@@ -620,22 +738,30 @@ func (w *Whisper) isEnvelopeCached(hash common.Hash) bool {
return exist
}
-func (w *Whisper) addDecryptedMessage(msg *ReceivedMessage) {
- w.poolMu.Lock()
- defer w.poolMu.Unlock()
-
- w.messages[msg.EnvelopeHash] = msg
-}
+// reset resets the node's statistics after each expiry cycle.
+func (s *Statistics) reset() {
+ s.cycles++
+ s.totalMessagesCleared += s.messagesCleared
-func (s *Statistics) clear() {
s.memoryCleared = 0
s.messagesCleared = 0
}
+// ValidateKeyID checks the format of key id.
+func ValidateKeyID(id string) error {
+ const target = keyIdSize * 2
+ if len(id) != target {
+ return fmt.Errorf("wrong size of key ID (expected %d bytes, got %d)", target, len(id))
+ }
+ return nil
+}
+
+// ValidatePublicKey checks the format of the given public key.
func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0
}
+// validatePrivateKey checks the format of the given private key.
func validatePrivateKey(k *ecdsa.PrivateKey) bool {
if k == nil || k.D == nil || k.D.Sign() == 0 {
return false
@@ -648,6 +774,7 @@ func validateSymmetricKey(k []byte) bool {
return len(k) > 0 && !containsOnlyZeros(k)
}
+// containsOnlyZeros checks if the data contain only zeros.
func containsOnlyZeros(data []byte) bool {
for _, b := range data {
if b != 0 {
@@ -657,7 +784,8 @@ func containsOnlyZeros(data []byte) bool {
return true
}
-func bytesToIntLittleEndian(b []byte) (res uint64) {
+// bytesToUintLittleEndian converts the slice to 64-bit unsigned integer.
+func bytesToUintLittleEndian(b []byte) (res uint64) {
mul := uint64(1)
for i := 0; i < len(b); i++ {
res += uint64(b[i]) * mul
@@ -666,7 +794,8 @@ func bytesToIntLittleEndian(b []byte) (res uint64) {
return res
}
-func BytesToIntBigEndian(b []byte) (res uint64) {
+// BytesToUintBigEndian converts the slice to 64-bit unsigned integer.
+func BytesToUintBigEndian(b []byte) (res uint64) {
for i := 0; i < len(b); i++ {
res *= 256
res += uint64(b[i])
@@ -674,7 +803,7 @@ func BytesToIntBigEndian(b []byte) (res uint64) {
return res
}
-// DeriveSymmetricKey derives symmetric key material from the key or password.
+// deriveKeyMaterial derives symmetric key material from the key or password.
// pbkdf2 is used for security, in case people use password instead of randomly generated keys.
func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error) {
if version == 0 {
@@ -686,3 +815,17 @@ func deriveKeyMaterial(key []byte, version uint64) (derivedKey []byte, err error
return nil, unknownVersionError(version)
}
}
+
+// GenerateRandomID generates a random string, which is then returned to be used as a key id
+func GenerateRandomID() (id string, err error) {
+ buf := make([]byte, keyIdSize)
+ _, err = crand.Read(buf)
+ if err != nil {
+ return "", err
+ }
+ if !validateSymmetricKey(buf) {
+ return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data")
+ }
+ id = common.Bytes2Hex(buf)
+ return id, err
+}