aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisper.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisper.go')
-rw-r--r--whisper/whisper.go77
1 files changed, 39 insertions, 38 deletions
diff --git a/whisper/whisper.go b/whisper/whisper.go
index f51f14a9f..e56c45786 100644
--- a/whisper/whisper.go
+++ b/whisper/whisper.go
@@ -16,8 +16,8 @@ import (
)
const (
- statusMsg = 0x00
- envelopesMsg = 0x01
+ statusCode = 0x00
+ messagesCode = 0x01
protocolVersion uint64 = 0x02
protocolName = "shh"
@@ -25,7 +25,8 @@ const (
signatureFlag = byte(1 << 7)
signatureLength = 65
- expirationTicks = 800 * time.Millisecond
+ expirationTicks = 800 * time.Millisecond
+ transmissionTicks = 300 * time.Millisecond
)
const (
@@ -69,7 +70,7 @@ func New() *Whisper {
Name: protocolName,
Version: uint(protocolVersion),
Length: 2,
- Run: whisper.msgHandler,
+ Run: whisper.handlePeer,
}
return whisper
@@ -168,6 +169,40 @@ func (self *Whisper) Stop() {
return
}*/
+// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
+// connection is negotiated.
+func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ // Create, initialize and start the whisper peer
+ whisperPeer, err := newPeer(self, peer, rw)
+ if err != nil {
+ return err
+ }
+ whisperPeer.start()
+ defer whisperPeer.stop()
+
+ // Read and process inbound messages directly to merge into client-global state
+ for {
+ // Fetch the next packet and decode the contained envelopes
+ packet, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ var envelopes []*Envelope
+ if err := packet.Decode(&envelopes); err != nil {
+ peer.Infof("failed to decode enveloped: %v", err)
+ continue
+ }
+ // Inject all envelopes into the internal pool
+ for _, envelope := range envelopes {
+ if err := self.add(envelope); err != nil {
+ // TODO Punish peer here. Invalid envelope.
+ peer.Debugf("failed to pool envelope: %f", err)
+ }
+ whisperPeer.mark(envelope)
+ }
+ }
+}
+
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp.
@@ -198,40 +233,6 @@ func (self *Whisper) add(envelope *Envelope) error {
return nil
}
-// Main handler for passing whisper messages to whisper peer objects
-func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
- wpeer := NewPeer(self, peer, ws)
- // initialise whisper peer (handshake/status)
- if err := wpeer.init(); err != nil {
- return err
- }
- // kick of the main handler for broadcasting/managing envelopes
- go wpeer.start()
- defer wpeer.stop()
-
- // Main *read* loop. Writing is done by the peer it self.
- for {
- msg, err := ws.ReadMsg()
- if err != nil {
- return err
- }
-
- var envelopes []*Envelope
- if err := msg.Decode(&envelopes); err != nil {
- peer.Infoln(err)
- continue
- }
-
- for _, envelope := range envelopes {
- if err := self.add(envelope); err != nil {
- // TODO Punish peer here. Invalid envelope.
- peer.Debugln(err)
- }
- wpeer.addKnown(envelope)
- }
- }
-}
-
// postEvent opens an envelope with the configured identities and delivers the
// message upstream from application processing.
func (self *Whisper) postEvent(envelope *Envelope) {