diff options
Diffstat (limited to 'whisper/whisperv5/whisper.go')
-rw-r--r-- | whisper/whisperv5/whisper.go | 145 |
1 files changed, 103 insertions, 42 deletions
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index f2aad08ef..cc6c65a9f 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "sync/atomic" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -44,6 +46,38 @@ type Statistics struct { totalMessagesCleared int } +type settingType byte +type settingsMap map[settingType]interface{} + +const ( + minPowIdx settingType = iota // Minimal PoW required by the whisper node + maxMsgSizeIdx settingType = iota // Maximal message length allowed by the whisper node + OverflowIdx settingType = iota // Indicator of message queue overflow +) + +type settingsVault struct { + vaultMu sync.Mutex + vault atomic.Value +} + +func (s *settingsVault) get(idx settingType) interface{} { + m := s.vault.Load().(settingsMap) + return m[idx] +} + +func (s *settingsVault) store(idx settingType, val interface{}) { + s.vaultMu.Lock() + defer s.vaultMu.Unlock() + + m1 := s.vault.Load().(settingsMap) + m2 := make(settingsMap) + for k, v := range m1 { + m2[k] = v + } + m2[idx] = val + s.vault.Store(m2) +} + // Whisper represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Whisper struct { @@ -54,28 +88,27 @@ type Whisper struct { symKeys map[string][]byte // Symmetric key storage keyMu sync.RWMutex // Mutex associated with key storages + poolMu sync.RWMutex // Mutex to sync the message and expiration pools envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node expirations map[uint32]*set.SetNonTS // Message expiration pool - poolMu sync.RWMutex // Mutex to sync the message and expiration pools - peers map[*Peer]struct{} // Set of currently active peers peerMu sync.RWMutex // Mutex to sync the active peer set + peers map[*Peer]struct{} // Set of currently active peers messageQueue chan *Envelope // Message queue for normal whisper messages p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) quit chan struct{} // Channel used for graceful exit - minPoW float64 // Minimal PoW required by the whisper node - maxMsgLength int // Maximal message length allowed by the whisper node - overflow bool // Indicator of message queue overflow + settings settingsVault // holds configuration settings that can be dynamically changed - stats Statistics // Statistics of whisper node + statsMu sync.Mutex // guard stats + stats Statistics // Statistics of whisper node mailServer MailServer // MailServer interface } // New creates a Whisper client ready to communicate through the Ethereum P2P network. -func New() *Whisper { +func New(cfg *Config) *Whisper { whisper := &Whisper{ privateKeys: make(map[string]*ecdsa.PrivateKey), symKeys: make(map[string][]byte), @@ -85,22 +118,47 @@ func New() *Whisper { messageQueue: make(chan *Envelope, messageQueueLimit), p2pMsgQueue: make(chan *Envelope, messageQueueLimit), quit: make(chan struct{}), - minPoW: DefaultMinimumPoW, - maxMsgLength: DefaultMaxMessageLength, } + whisper.filters = NewFilters(whisper) + whisper.settings.vault.Store(make(settingsMap)) + whisper.settings.store(minPowIdx, cfg.MinimumAcceptedPOW) + whisper.settings.store(maxMsgSizeIdx, cfg.MaxMessageSize) + whisper.settings.store(OverflowIdx, false) + // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ Name: ProtocolName, Version: uint(ProtocolVersion), Length: NumberOfMessageCodes, Run: whisper.HandlePeer, + NodeInfo: func() interface{} { + return map[string]interface{}{ + "version": ProtocolVersionStr, + "maxMessageSize": whisper.MaxMessageSize(), + "minimumPoW": whisper.MinPow(), + } + }, } return whisper } +func (w *Whisper) MinPow() float64 { + return w.settings.get(minPowIdx).(float64) +} + +// MaxMessageSize returns the maximum accepted message size. +func (w *Whisper) MaxMessageSize() uint32 { + return w.settings.get(maxMsgSizeIdx).(uint32) +} + +// Overflow returns an indication if the message queue is full. +func (w *Whisper) Overflow() bool { + return w.settings.get(OverflowIdx).(bool) +} + // APIs returns the RPC descriptors the Whisper implementation offers func (w *Whisper) APIs() []rpc.API { return []rpc.API{ @@ -129,12 +187,12 @@ func (w *Whisper) Version() uint { return w.protocol.Version } -// SetMaxMessageLength sets the maximal message length allowed by this node -func (w *Whisper) SetMaxMessageLength(val int) error { - if val <= 0 { - return fmt.Errorf("invalid message length: %d", val) +// SetMaxMessageSize sets the maximal message size allowed by this node +func (w *Whisper) SetMaxMessageSize(size uint32) error { + if size > MaxMessageSize { + return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) } - w.maxMsgLength = val + w.settings.store(maxMsgSizeIdx, uint32(size)) return nil } @@ -143,7 +201,7 @@ func (w *Whisper) SetMinimumPoW(val float64) error { if val <= 0.0 { return fmt.Errorf("invalid PoW: %f", val) } - w.minPoW = val + w.settings.store(minPowIdx, val) return nil } @@ -240,6 +298,20 @@ func (w *Whisper) DeleteKeyPair(key string) bool { return false } +// AddKeyPair imports a asymmetric private key and returns it identifier. +func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { + id, err := GenerateRandomID() + if err != nil { + return "", fmt.Errorf("failed to generate ID: %s", err) + } + + w.keyMu.Lock() + w.privateKeys[id] = key + w.keyMu.Unlock() + + return id, nil +} + // HasKeyPair checks if the the whisper node is configured with the private key // of the specified public pair. func (w *Whisper) HasKeyPair(id string) bool { @@ -451,7 +523,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("message loop", "peer", p.peer.ID(), "err", err) return err } - if packet.Size > uint32(wh.maxMsgLength) { + if packet.Size > wh.MaxMessageSize() { log.Warn("oversized message received", "peer", p.peer.ID()) return errors.New("oversized message received") } @@ -532,7 +604,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } } - if envelope.size() > wh.maxMsgLength { + if uint32(envelope.size()) > wh.MaxMessageSize() { return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } @@ -547,7 +619,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash()) } - if envelope.PoW() < wh.minPoW { + if envelope.PoW() < wh.MinPow() { log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) return false, nil // drop envelope without error } @@ -571,7 +643,9 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) } else { log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) + wh.statsMu.Lock() wh.stats.memoryUsed += envelope.size() + wh.statsMu.Unlock() wh.postEvent(envelope, false) // notify the local node about the new message if wh.mailServer != nil { wh.mailServer.Archive(envelope) @@ -600,13 +674,13 @@ func (w *Whisper) checkOverflow() { queueSize := len(w.messageQueue) if queueSize == messageQueueLimit { - if !w.overflow { - w.overflow = true + if !w.Overflow() { + w.settings.store(OverflowIdx, true) log.Warn("message queue overflow") } } else if queueSize <= messageQueueLimit/2 { - if w.overflow { - w.overflow = false + if w.Overflow() { + w.settings.store(OverflowIdx, false) log.Warn("message queue overflow fixed (back to normal)") } } @@ -653,6 +727,8 @@ func (w *Whisper) expire() { w.poolMu.Lock() defer w.poolMu.Unlock() + w.statsMu.Lock() + defer w.statsMu.Unlock() w.stats.reset() now := uint32(time.Now().Unix()) for expiry, hashSet := range w.expirations { @@ -673,17 +749,11 @@ func (w *Whisper) expire() { } // Stats returns the whisper node statistics. -func (w *Whisper) Stats() string { - result := fmt.Sprintf("Memory usage: %d bytes. Average messages cleared per expiry cycle: %d. Total messages cleared: %d.", - w.stats.memoryUsed, w.stats.totalMessagesCleared/w.stats.cycles, w.stats.totalMessagesCleared) - if w.stats.messagesCleared > 0 { - result += fmt.Sprintf(" Latest expiry cycle cleared %d messages (%d bytes).", - w.stats.messagesCleared, w.stats.memoryCleared) - } - if w.overflow { - result += " Message queue state: overflow." - } - return result +func (w *Whisper) Stats() Statistics { + w.statsMu.Lock() + defer w.statsMu.Unlock() + + return w.stats } // Envelopes retrieves all the messages currently pooled by the node. @@ -734,15 +804,6 @@ func (s *Statistics) reset() { s.messagesCleared = 0 } -// ValidateKeyID checks the format of key id. -func ValidateKeyID(id string) error { - const target = keyIdSize * 2 - if len(id) != target { - return fmt.Errorf("wrong size of key ID (expected %d bytes, got %d)", target, len(id)) - } - return nil -} - // ValidatePublicKey checks the format of the given public key. func ValidatePublicKey(k *ecdsa.PublicKey) bool { return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0 |