aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv6/whisper.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv6/whisper.go')
-rw-r--r--whisper/whisperv6/whisper.go86
1 files changed, 73 insertions, 13 deletions
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 2cc1e64f5..492591486 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"fmt"
+ "math"
"runtime"
"sync"
"time"
@@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
@@ -74,6 +76,8 @@ type Whisper struct {
settings syncmap.Map // holds configuration settings that can be dynamically changed
+ reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
+
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
@@ -87,14 +91,15 @@ func New(cfg *Config) *Whisper {
}
whisper := &Whisper{
- privateKeys: make(map[string]*ecdsa.PrivateKey),
- symKeys: make(map[string][]byte),
- envelopes: make(map[common.Hash]*Envelope),
- expirations: make(map[uint32]*set.SetNonTS),
- peers: make(map[*Peer]struct{}),
- messageQueue: make(chan *Envelope, messageQueueLimit),
- p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
- quit: make(chan struct{}),
+ privateKeys: make(map[string]*ecdsa.PrivateKey),
+ symKeys: make(map[string][]byte),
+ envelopes: make(map[common.Hash]*Envelope),
+ expirations: make(map[uint32]*set.SetNonTS),
+ peers: make(map[*Peer]struct{}),
+ messageQueue: make(chan *Envelope, messageQueueLimit),
+ p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
+ quit: make(chan struct{}),
+ reactionAllowance: SynchAllowance,
}
whisper.filters = NewFilters(whisper)
@@ -177,13 +182,50 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
// SetMinimumPoW sets the minimal PoW required by this node
func (w *Whisper) SetMinimumPoW(val float64) error {
- if val <= 0.0 {
+ if val < 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
- w.settings.Store(minPowIdx, val)
+
+ w.notifyPeersAboutPowRequirementChange(val)
+
+ go func() {
+ // allow some time before all the peers have processed the notification
+ time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
+ w.settings.Store(minPowIdx, val)
+ }()
+
return nil
}
+// SetMinimumPoW sets the minimal PoW in test environment
+func (w *Whisper) SetMinimumPowTest(val float64) {
+ w.notifyPeersAboutPowRequirementChange(val)
+ w.settings.Store(minPowIdx, val)
+}
+
+func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
+ arr := make([]*Peer, len(w.peers))
+ i := 0
+
+ w.peerMu.Lock()
+ for p := range w.peers {
+ arr[i] = p
+ i++
+ }
+ w.peerMu.Unlock()
+
+ for _, p := range arr {
+ err := p.notifyAboutPowRequirementChange(pow)
+ if err != nil {
+ // allow one retry
+ err = p.notifyAboutPowRequirementChange(pow)
+ }
+ if err != nil {
+ log.Warn("oversized message received", "peer", p.ID(), "error", err)
+ }
+ }
+}
+
// getPeer retrieves peer by ID
func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
w.peerMu.Lock()
@@ -233,7 +275,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
// SendP2PDirect sends a peer-to-peer message to a specific peer.
func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
- return p2p.Send(peer.ws, p2pCode, envelope)
+ return p2p.Send(peer.ws, p2pMessageCode, envelope)
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
@@ -536,7 +578,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if trouble {
return errors.New("invalid envelope")
}
- case p2pCode:
+ case powRequirementCode:
+ s := rlp.NewStream(packet.Payload, uint64(packet.Size))
+ i, err := s.Uint()
+ if err != nil {
+ log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid powRequirementCode message")
+ }
+ f := math.Float64frombits(i)
+ if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
+ log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid value in powRequirementCode message")
+ }
+ p.powRequirement = f
+ case bloomFilterExCode:
+ // to be implemented
+ case p2pMessageCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
// therefore might not satisfy the PoW, expiry and other requirements.
@@ -599,7 +656,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
- return false, nil // drop envelope without error
+ return false, nil // drop envelope without error for now
+
+ // once the status message includes the PoW requirement, an error should be returned here:
+ //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
}
hash := envelope.Hash()