diff options
Diffstat (limited to 'whisper/whisperv6/whisper.go')
-rw-r--r-- | whisper/whisperv6/whisper.go | 86 |
1 files changed, 73 insertions, 13 deletions
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 2cc1e64f5..492591486 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -22,6 +22,7 @@ import ( crand "crypto/rand" "crypto/sha256" "fmt" + "math" "runtime" "sync" "time" @@ -30,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/syndtr/goleveldb/leveldb/errors" "golang.org/x/crypto/pbkdf2" @@ -74,6 +76,8 @@ type Whisper struct { settings syncmap.Map // holds configuration settings that can be dynamically changed + reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages + statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node @@ -87,14 +91,15 @@ func New(cfg *Config) *Whisper { } whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), - quit: make(chan struct{}), + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + reactionAllowance: SynchAllowance, } whisper.filters = NewFilters(whisper) @@ -177,13 +182,50 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error { // SetMinimumPoW sets the minimal PoW required by this node func (w *Whisper) SetMinimumPoW(val float64) error { - if val <= 0.0 { + if val < 0.0 { return fmt.Errorf("invalid PoW: %f", val) } - w.settings.Store(minPowIdx, val) + + w.notifyPeersAboutPowRequirementChange(val) + + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(w.reactionAllowance) * time.Second) + w.settings.Store(minPowIdx, val) + }() + return nil } +// SetMinimumPoW sets the minimal PoW in test environment +func (w *Whisper) SetMinimumPowTest(val float64) { + w.notifyPeersAboutPowRequirementChange(val) + w.settings.Store(minPowIdx, val) +} + +func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { + arr := make([]*Peer, len(w.peers)) + i := 0 + + w.peerMu.Lock() + for p := range w.peers { + arr[i] = p + i++ + } + w.peerMu.Unlock() + + for _, p := range arr { + err := p.notifyAboutPowRequirementChange(pow) + if err != nil { + // allow one retry + err = p.notifyAboutPowRequirementChange(pow) + } + if err != nil { + log.Warn("oversized message received", "peer", p.ID(), "error", err) + } + } +} + // getPeer retrieves peer by ID func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { w.peerMu.Lock() @@ -233,7 +275,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { // SendP2PDirect sends a peer-to-peer message to a specific peer. func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { - return p2p.Send(peer.ws, p2pCode, envelope) + return p2p.Send(peer.ws, p2pMessageCode, envelope) } // NewKeyPair generates a new cryptographic identity for the client, and injects @@ -536,7 +578,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { if trouble { return errors.New("invalid envelope") } - case p2pCode: + case powRequirementCode: + s := rlp.NewStream(packet.Payload, uint64(packet.Size)) + i, err := s.Uint() + if err != nil { + log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid powRequirementCode message") + } + f := math.Float64frombits(i) + if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 { + log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid value in powRequirementCode message") + } + p.powRequirement = f + case bloomFilterExCode: + // to be implemented + case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and // therefore might not satisfy the PoW, expiry and other requirements. @@ -599,7 +656,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { if envelope.PoW() < wh.MinPow() { log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error + return false, nil // drop envelope without error for now + + // once the status message includes the PoW requirement, an error should be returned here: + //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) } hash := envelope.Hash() |