aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv6/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv6/peer.go')
-rw-r--r--whisper/whisperv6/peer.go151
1 files changed, 106 insertions, 45 deletions
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go
index ac7b3b12b..4f9a7c378 100644
--- a/whisper/whisperv6/peer.go
+++ b/whisper/whisperv6/peer.go
@@ -18,6 +18,7 @@ package whisperv6
import (
"fmt"
+ "math"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -27,12 +28,15 @@ import (
set "gopkg.in/fatih/set.v0"
)
-// peer represents a whisper protocol peer connection.
+// Peer represents a whisper protocol peer connection.
type Peer struct {
- host *Whisper
- peer *p2p.Peer
- ws p2p.MsgReadWriter
- trusted bool
+ host *Whisper
+ peer *p2p.Peer
+ ws p2p.MsgReadWriter
+
+ trusted bool
+ powRequirement float64
+ bloomFilter []byte // may contain nil in case of full node
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
@@ -42,62 +46,95 @@ type Peer struct {
// newPeer creates a new whisper peer object, but does not run the handshake itself.
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
- host: host,
- peer: remote,
- ws: rw,
- trusted: false,
- known: set.New(),
- quit: make(chan struct{}),
+ host: host,
+ peer: remote,
+ ws: rw,
+ trusted: false,
+ powRequirement: 0.0,
+ known: set.New(),
+ quit: make(chan struct{}),
}
}
// start initiates the peer updater, periodically broadcasting the whisper packets
// into the network.
-func (p *Peer) start() {
- go p.update()
- log.Trace("start", "peer", p.ID())
+func (peer *Peer) start() {
+ go peer.update()
+ log.Trace("start", "peer", peer.ID())
}
// stop terminates the peer updater, stopping message forwarding to it.
-func (p *Peer) stop() {
- close(p.quit)
- log.Trace("stop", "peer", p.ID())
+func (peer *Peer) stop() {
+ close(peer.quit)
+ log.Trace("stop", "peer", peer.ID())
}
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
-func (p *Peer) handshake() error {
+func (peer *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
go func() {
- errc <- p2p.Send(p.ws, statusCode, ProtocolVersion)
+ pow := peer.host.MinPow()
+ powConverted := math.Float64bits(pow)
+ bloom := peer.host.BloomFilter()
+ errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
}()
+
// Fetch the remote status packet and verify protocol match
- packet, err := p.ws.ReadMsg()
+ packet, err := peer.ws.ReadMsg()
if err != nil {
return err
}
if packet.Code != statusCode {
- return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code)
+ return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
}
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
+ _, err = s.List()
+ if err != nil {
+ return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
+ }
peerVersion, err := s.Uint()
if err != nil {
- return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err)
+ return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
}
if peerVersion != ProtocolVersion {
- return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion)
+ return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
+ }
+
+ // only version is mandatory, subsequent parameters are optional
+ powRaw, err := s.Uint()
+ if err == nil {
+ pow := math.Float64frombits(powRaw)
+ if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
+ return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
+ }
+ peer.powRequirement = pow
+
+ var bloom []byte
+ err = s.Decode(&bloom)
+ if err == nil {
+ sz := len(bloom)
+ if sz != bloomFilterSize && sz != 0 {
+ return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
+ }
+ if isFullNode(bloom) {
+ peer.bloomFilter = nil
+ } else {
+ peer.bloomFilter = bloom
+ }
+ }
}
- // Wait until out own status is consumed too
+
if err := <-errc; err != nil {
- return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err)
+ return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}
return nil
}
// update executes periodic operations on the peer, including message transmission
// and expiration.
-func (p *Peer) update() {
+func (peer *Peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(expirationCycle)
transmit := time.NewTicker(transmissionCycle)
@@ -106,15 +143,15 @@ func (p *Peer) update() {
for {
select {
case <-expire.C:
- p.expire()
+ peer.expire()
case <-transmit.C:
- if err := p.broadcast(); err != nil {
- log.Trace("broadcast failed", "reason", err, "peer", p.ID())
+ if err := peer.broadcast(); err != nil {
+ log.Trace("broadcast failed", "reason", err, "peer", peer.ID())
return
}
- case <-p.quit:
+ case <-peer.quit:
return
}
}
@@ -148,27 +185,51 @@ func (peer *Peer) expire() {
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
-func (p *Peer) broadcast() error {
- var cnt int
- envelopes := p.host.Envelopes()
+func (peer *Peer) broadcast() error {
+ envelopes := peer.host.Envelopes()
+ bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
- if !p.marked(envelope) {
- err := p2p.Send(p.ws, messagesCode, envelope)
- if err != nil {
- return err
- } else {
- p.mark(envelope)
- cnt++
- }
+ if !peer.marked(envelope) && envelope.PoW() >= peer.powRequirement && peer.bloomMatch(envelope) {
+ bundle = append(bundle, envelope)
}
}
- if cnt > 0 {
- log.Trace("broadcast", "num. messages", cnt)
+
+ if len(bundle) > 0 {
+ // transmit the batch of envelopes
+ if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {
+ return err
+ }
+
+ // mark envelopes only if they were successfully sent
+ for _, e := range bundle {
+ peer.mark(e)
+ }
+
+ log.Trace("broadcast", "num. messages", len(bundle))
}
return nil
}
-func (p *Peer) ID() []byte {
- id := p.peer.ID()
+// ID returns a peer's id
+func (peer *Peer) ID() []byte {
+ id := peer.peer.ID()
return id[:]
}
+
+func (peer *Peer) notifyAboutPowRequirementChange(pow float64) error {
+ i := math.Float64bits(pow)
+ return p2p.Send(peer.ws, powRequirementCode, i)
+}
+
+func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
+ return p2p.Send(peer.ws, bloomFilterExCode, bloom)
+}
+
+func (peer *Peer) bloomMatch(env *Envelope) bool {
+ if peer.bloomFilter == nil {
+ // no filter - full node, accepts all envelops
+ return true
+ }
+
+ return bloomFilterMatch(peer.bloomFilter, env.Bloom())
+}