diff options
Diffstat (limited to 'whisper/whisperv6/peer.go')
-rw-r--r-- | whisper/whisperv6/peer.go | 151 |
1 files changed, 106 insertions, 45 deletions
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index ac7b3b12b..4f9a7c378 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -18,6 +18,7 @@ package whisperv6 import ( "fmt" + "math" "time" "github.com/ethereum/go-ethereum/common" @@ -27,12 +28,15 @@ import ( set "gopkg.in/fatih/set.v0" ) -// peer represents a whisper protocol peer connection. +// Peer represents a whisper protocol peer connection. type Peer struct { - host *Whisper - peer *p2p.Peer - ws p2p.MsgReadWriter - trusted bool + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + trusted bool + powRequirement float64 + bloomFilter []byte // may contain nil in case of full node known *set.Set // Messages already known by the peer to avoid wasting bandwidth @@ -42,62 +46,95 @@ type Peer struct { // newPeer creates a new whisper peer object, but does not run the handshake itself. func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer { return &Peer{ - host: host, - peer: remote, - ws: rw, - trusted: false, - known: set.New(), - quit: make(chan struct{}), + host: host, + peer: remote, + ws: rw, + trusted: false, + powRequirement: 0.0, + known: set.New(), + quit: make(chan struct{}), } } // start initiates the peer updater, periodically broadcasting the whisper packets // into the network. -func (p *Peer) start() { - go p.update() - log.Trace("start", "peer", p.ID()) +func (peer *Peer) start() { + go peer.update() + log.Trace("start", "peer", peer.ID()) } // stop terminates the peer updater, stopping message forwarding to it. -func (p *Peer) stop() { - close(p.quit) - log.Trace("stop", "peer", p.ID()) +func (peer *Peer) stop() { + close(peer.quit) + log.Trace("stop", "peer", peer.ID()) } // handshake sends the protocol initiation status message to the remote peer and // verifies the remote status too. -func (p *Peer) handshake() error { +func (peer *Peer) handshake() error { // Send the handshake status message asynchronously errc := make(chan error, 1) go func() { - errc <- p2p.Send(p.ws, statusCode, ProtocolVersion) + pow := peer.host.MinPow() + powConverted := math.Float64bits(pow) + bloom := peer.host.BloomFilter() + errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom) }() + // Fetch the remote status packet and verify protocol match - packet, err := p.ws.ReadMsg() + packet, err := peer.ws.ReadMsg() if err != nil { return err } if packet.Code != statusCode { - return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code) + return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code) } s := rlp.NewStream(packet.Payload, uint64(packet.Size)) + _, err = s.List() + if err != nil { + return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err) + } peerVersion, err := s.Uint() if err != nil { - return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err) + return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err) } if peerVersion != ProtocolVersion { - return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion) + return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion) + } + + // only version is mandatory, subsequent parameters are optional + powRaw, err := s.Uint() + if err == nil { + pow := math.Float64frombits(powRaw) + if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 { + return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID()) + } + peer.powRequirement = pow + + var bloom []byte + err = s.Decode(&bloom) + if err == nil { + sz := len(bloom) + if sz != bloomFilterSize && sz != 0 { + return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz) + } + if isFullNode(bloom) { + peer.bloomFilter = nil + } else { + peer.bloomFilter = bloom + } + } } - // Wait until out own status is consumed too + if err := <-errc; err != nil { - return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err) + return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) } return nil } // update executes periodic operations on the peer, including message transmission // and expiration. -func (p *Peer) update() { +func (peer *Peer) update() { // Start the tickers for the updates expire := time.NewTicker(expirationCycle) transmit := time.NewTicker(transmissionCycle) @@ -106,15 +143,15 @@ func (p *Peer) update() { for { select { case <-expire.C: - p.expire() + peer.expire() case <-transmit.C: - if err := p.broadcast(); err != nil { - log.Trace("broadcast failed", "reason", err, "peer", p.ID()) + if err := peer.broadcast(); err != nil { + log.Trace("broadcast failed", "reason", err, "peer", peer.ID()) return } - case <-p.quit: + case <-peer.quit: return } } @@ -148,27 +185,51 @@ func (peer *Peer) expire() { // broadcast iterates over the collection of envelopes and transmits yet unknown // ones over the network. -func (p *Peer) broadcast() error { - var cnt int - envelopes := p.host.Envelopes() +func (peer *Peer) broadcast() error { + envelopes := peer.host.Envelopes() + bundle := make([]*Envelope, 0, len(envelopes)) for _, envelope := range envelopes { - if !p.marked(envelope) { - err := p2p.Send(p.ws, messagesCode, envelope) - if err != nil { - return err - } else { - p.mark(envelope) - cnt++ - } + if !peer.marked(envelope) && envelope.PoW() >= peer.powRequirement && peer.bloomMatch(envelope) { + bundle = append(bundle, envelope) } } - if cnt > 0 { - log.Trace("broadcast", "num. messages", cnt) + + if len(bundle) > 0 { + // transmit the batch of envelopes + if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil { + return err + } + + // mark envelopes only if they were successfully sent + for _, e := range bundle { + peer.mark(e) + } + + log.Trace("broadcast", "num. messages", len(bundle)) } return nil } -func (p *Peer) ID() []byte { - id := p.peer.ID() +// ID returns a peer's id +func (peer *Peer) ID() []byte { + id := peer.peer.ID() return id[:] } + +func (peer *Peer) notifyAboutPowRequirementChange(pow float64) error { + i := math.Float64bits(pow) + return p2p.Send(peer.ws, powRequirementCode, i) +} + +func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error { + return p2p.Send(peer.ws, bloomFilterExCode, bloom) +} + +func (peer *Peer) bloomMatch(env *Envelope) bool { + if peer.bloomFilter == nil { + // no filter - full node, accepts all envelops + return true + } + + return bloomFilterMatch(peer.bloomFilter, env.Bloom()) +} |