diff options
Diffstat (limited to 'whisper/whisperv6/whisper.go')
-rw-r--r-- | whisper/whisperv6/whisper.go | 242 |
1 files changed, 206 insertions, 36 deletions
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 492591486..bc89aadcc 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -48,9 +48,12 @@ type Statistics struct { } const ( - minPowIdx = iota // Minimal PoW required by the whisper node - maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node - overflowIdx = iota // Indicator of message queue overflow + maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node + overflowIdx // Indicator of message queue overflow + minPowIdx // Minimal PoW required by the whisper node + minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time + bloomFilterIdx // Bloom filter for topics of interest for this node + bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time ) // Whisper represents a dark communication interface through the Ethereum @@ -76,7 +79,7 @@ type Whisper struct { settings syncmap.Map // holds configuration settings that can be dynamically changed - reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages + syncAllowance int // maximum time in seconds allowed to process the whisper-related messages statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node @@ -91,15 +94,15 @@ func New(cfg *Config) *Whisper { } whisper := &Whisper{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]*set.SetNonTS), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), - quit: make(chan struct{}), - reactionAllowance: SynchAllowance, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*Peer]struct{}), + messageQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + quit: make(chan struct{}), + syncAllowance: DefaultSyncAllowance, } whisper.filters = NewFilters(whisper) @@ -126,11 +129,55 @@ func New(cfg *Config) *Whisper { return whisper } +// MinPow returns the PoW value required by this node. func (w *Whisper) MinPow() float64 { - val, _ := w.settings.Load(minPowIdx) + val, exist := w.settings.Load(minPowIdx) + if !exist || val == nil { + return DefaultMinimumPoW + } + v, ok := val.(float64) + if !ok { + log.Error("Error loading minPowIdx, using default") + return DefaultMinimumPoW + } + return v +} + +// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited +// time after PoW was changed. If sufficient time have elapsed or no change of PoW +// have ever occurred, the return value will be the same as return value of MinPow(). +func (w *Whisper) MinPowTolerance() float64 { + val, exist := w.settings.Load(minPowToleranceIdx) + if !exist || val == nil { + return DefaultMinimumPoW + } return val.(float64) } +// BloomFilter returns the aggregated bloom filter for all the topics of interest. +// The nodes are required to send only messages that match the advertised bloom filter. +// If a message does not match the bloom, it will tantamount to spam, and the peer will +// be disconnected. +func (w *Whisper) BloomFilter() []byte { + val, exist := w.settings.Load(bloomFilterIdx) + if !exist || val == nil { + return nil + } + return val.([]byte) +} + +// BloomFilterTolerance returns the bloom filter which is tolerated for a limited +// time after new bloom was advertised to the peers. If sufficient time have elapsed +// or no change of bloom filter have ever occurred, the return value will be the same +// as return value of BloomFilter(). +func (w *Whisper) BloomFilterTolerance() []byte { + val, exist := w.settings.Load(bloomFilterToleranceIdx) + if !exist || val == nil { + return nil + } + return val.([]byte) +} + // MaxMessageSize returns the maximum accepted message size. func (w *Whisper) MaxMessageSize() uint32 { val, _ := w.settings.Load(maxMsgSizeIdx) @@ -180,18 +227,40 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error { return nil } +// SetBloomFilter sets the new bloom filter +func (w *Whisper) SetBloomFilter(bloom []byte) error { + if len(bloom) != bloomFilterSize { + return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) + } + + b := make([]byte, bloomFilterSize) + copy(b, bloom) + + w.settings.Store(bloomFilterIdx, b) + w.notifyPeersAboutBloomFilterChange(b) + + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(w.syncAllowance) * time.Second) + w.settings.Store(bloomFilterToleranceIdx, b) + }() + + return nil +} + // SetMinimumPoW sets the minimal PoW required by this node func (w *Whisper) SetMinimumPoW(val float64) error { if val < 0.0 { return fmt.Errorf("invalid PoW: %f", val) } + w.settings.Store(minPowIdx, val) w.notifyPeersAboutPowRequirementChange(val) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(w.reactionAllowance) * time.Second) - w.settings.Store(minPowIdx, val) + time.Sleep(time.Duration(w.syncAllowance) * time.Second) + w.settings.Store(minPowToleranceIdx, val) }() return nil @@ -199,21 +268,13 @@ func (w *Whisper) SetMinimumPoW(val float64) error { // SetMinimumPoW sets the minimal PoW in test environment func (w *Whisper) SetMinimumPowTest(val float64) { - w.notifyPeersAboutPowRequirementChange(val) w.settings.Store(minPowIdx, val) + w.notifyPeersAboutPowRequirementChange(val) + w.settings.Store(minPowToleranceIdx, val) } func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { - arr := make([]*Peer, len(w.peers)) - i := 0 - - w.peerMu.Lock() - for p := range w.peers { - arr[i] = p - i++ - } - w.peerMu.Unlock() - + arr := w.getPeers() for _, p := range arr { err := p.notifyAboutPowRequirementChange(pow) if err != nil { @@ -221,11 +282,37 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { err = p.notifyAboutPowRequirementChange(pow) } if err != nil { - log.Warn("oversized message received", "peer", p.ID(), "error", err) + log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err) + } + } +} + +func (w *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) { + arr := w.getPeers() + for _, p := range arr { + err := p.notifyAboutBloomFilterChange(bloom) + if err != nil { + // allow one retry + err = p.notifyAboutBloomFilterChange(bloom) + } + if err != nil { + log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err) } } } +func (w *Whisper) getPeers() []*Peer { + arr := make([]*Peer, len(w.peers)) + i := 0 + w.peerMu.Lock() + for p := range w.peers { + arr[i] = p + i++ + } + w.peerMu.Unlock() + return arr +} + // getPeer retrieves peer by ID func (w *Whisper) getPeer(peerID []byte) (*Peer, error) { w.peerMu.Lock() @@ -459,7 +546,28 @@ func (w *Whisper) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Whisper) Subscribe(f *Filter) (string, error) { - return w.filters.Install(f) + s, err := w.filters.Install(f) + if err == nil { + w.updateBloomFilter(f) + } + return s, err +} + +// updateBloomFilter recalculates the new value of bloom filter, +// and informs the peers if necessary. +func (w *Whisper) updateBloomFilter(f *Filter) { + aggregate := make([]byte, bloomFilterSize) + for _, t := range f.Topics { + top := BytesToTopic(t) + b := TopicToBloom(top) + aggregate = addBloom(aggregate, b) + } + + if !bloomFilterMatch(w.BloomFilter(), aggregate) { + // existing bloom filter must be updated + aggregate = addBloom(w.BloomFilter(), aggregate) + w.SetBloomFilter(aggregate) + } } // GetFilter returns the filter by id. @@ -592,7 +700,21 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } p.powRequirement = f case bloomFilterExCode: - // to be implemented + var bloom []byte + err := packet.Decode(&bloom) + if err == nil && len(bloom) != bloomFilterSize { + err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) + } + + if err != nil { + log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid bloom filter exchange message") + } + if isFullNode(bloom) { + p.bloomFilter = nil + } else { + p.bloomFilter = bloom + } case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and @@ -633,7 +755,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { sent := envelope.Expiry - envelope.TTL if sent > now { - if sent-SynchAllowance > now { + if sent-DefaultSyncAllowance > now { return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) } else { // recalculate PoW, adjusted for the time difference, plus one second for latency @@ -642,7 +764,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } if envelope.Expiry < now { - if envelope.Expiry+SynchAllowance*2 < now { + if envelope.Expiry+DefaultSyncAllowance*2 < now { return false, fmt.Errorf("very old message") } else { log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) @@ -655,11 +777,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { } if envelope.PoW() < wh.MinPow() { - log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) - return false, nil // drop envelope without error for now + // maybe the value was recently changed, and the peers did not adjust yet. + // in this case the previous value is retrieved by MinPowTolerance() + // for a short period of peer synchronization. + if envelope.PoW() < wh.MinPowTolerance() { + return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + } + } - // once the status message includes the PoW requirement, an error should be returned here: - //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + if !bloomFilterMatch(wh.BloomFilter(), envelope.Bloom()) { + // maybe the value was recently changed, and the peers did not adjust yet. + // in this case the previous value is retrieved by BloomFilterTolerance() + // for a short period of peer synchronization. + if !bloomFilterMatch(wh.BloomFilterTolerance(), envelope.Bloom()) { + return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x", + envelope.Hash().Hex(), wh.BloomFilter(), envelope.Bloom(), envelope.Topic) + } } hash := envelope.Hash() @@ -897,3 +1030,40 @@ func GenerateRandomID() (id string, err error) { id = common.Bytes2Hex(buf) return id, err } + +func isFullNode(bloom []byte) bool { + if bloom == nil { + return true + } + for _, b := range bloom { + if b != 255 { + return false + } + } + return true +} + +func bloomFilterMatch(filter, sample []byte) bool { + if filter == nil { + // full node, accepts all messages + return true + } + + for i := 0; i < bloomFilterSize; i++ { + f := filter[i] + s := sample[i] + if (f | s) != f { + return false + } + } + + return true +} + +func addBloom(a, b []byte) []byte { + c := make([]byte, bloomFilterSize) + for i := 0; i < bloomFilterSize; i++ { + c[i] = a[i] | b[i] + } + return c +} |