diff options
Diffstat (limited to 'whisper/whisper.go')
-rw-r--r-- | whisper/whisper.go | 77 |
1 files changed, 39 insertions, 38 deletions
diff --git a/whisper/whisper.go b/whisper/whisper.go index f51f14a9f..e56c45786 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -16,8 +16,8 @@ import ( ) const ( - statusMsg = 0x00 - envelopesMsg = 0x01 + statusCode = 0x00 + messagesCode = 0x01 protocolVersion uint64 = 0x02 protocolName = "shh" @@ -25,7 +25,8 @@ const ( signatureFlag = byte(1 << 7) signatureLength = 65 - expirationTicks = 800 * time.Millisecond + expirationTicks = 800 * time.Millisecond + transmissionTicks = 300 * time.Millisecond ) const ( @@ -69,7 +70,7 @@ func New() *Whisper { Name: protocolName, Version: uint(protocolVersion), Length: 2, - Run: whisper.msgHandler, + Run: whisper.handlePeer, } return whisper @@ -168,6 +169,40 @@ func (self *Whisper) Stop() { return }*/ +// handlePeer is called by the underlying P2P layer when the whisper sub-protocol +// connection is negotiated. +func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + // Create, initialize and start the whisper peer + whisperPeer, err := newPeer(self, peer, rw) + if err != nil { + return err + } + whisperPeer.start() + defer whisperPeer.stop() + + // Read and process inbound messages directly to merge into client-global state + for { + // Fetch the next packet and decode the contained envelopes + packet, err := rw.ReadMsg() + if err != nil { + return err + } + var envelopes []*Envelope + if err := packet.Decode(&envelopes); err != nil { + peer.Infof("failed to decode enveloped: %v", err) + continue + } + // Inject all envelopes into the internal pool + for _, envelope := range envelopes { + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Debugf("failed to pool envelope: %f", err) + } + whisperPeer.mark(envelope) + } + } +} + // add inserts a new envelope into the message pool to be distributed within the // whisper network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. @@ -198,40 +233,6 @@ func (self *Whisper) add(envelope *Envelope) error { return nil } -// Main handler for passing whisper messages to whisper peer objects -func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { - wpeer := NewPeer(self, peer, ws) - // initialise whisper peer (handshake/status) - if err := wpeer.init(); err != nil { - return err - } - // kick of the main handler for broadcasting/managing envelopes - go wpeer.start() - defer wpeer.stop() - - // Main *read* loop. Writing is done by the peer it self. - for { - msg, err := ws.ReadMsg() - if err != nil { - return err - } - - var envelopes []*Envelope - if err := msg.Decode(&envelopes); err != nil { - peer.Infoln(err) - continue - } - - for _, envelope := range envelopes { - if err := self.add(envelope); err != nil { - // TODO Punish peer here. Invalid envelope. - peer.Debugln(err) - } - wpeer.addKnown(envelope) - } - } -} - // postEvent opens an envelope with the configured identities and delivers the // message upstream from application processing. func (self *Whisper) postEvent(envelope *Envelope) { |