From e3cad04decbbc83a0c956850717cb0ae0b2b3eec Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 13 Jan 2015 13:36:44 +0100 Subject: Fixed whisper messages * Whisper protocol wasn't properly suppling envelope slices * Message history wasn't properly propagated * Added 'Messages' method, filtering any current envelope with the supplied filter. --- whisper/envelope.go | 42 ++++++++++++++++++++++++------------------ whisper/message.go | 6 +++++- whisper/whisper.go | 22 ++++++++++++++-------- 3 files changed, 43 insertions(+), 27 deletions(-) (limited to 'whisper') diff --git a/whisper/envelope.go b/whisper/envelope.go index 9d28dfa6b..3c477ad9f 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -1,11 +1,9 @@ package whisper import ( - "bytes" "crypto/ecdsa" "encoding/binary" "fmt" - "io" "time" "github.com/ethereum/go-ethereum/crypto" @@ -28,22 +26,6 @@ type Envelope struct { hash Hash } -func NewEnvelopeFromReader(reader io.Reader) (*Envelope, error) { - var envelope Envelope - - buf := new(bytes.Buffer) - buf.ReadFrom(reader) - - h := H(crypto.Sha3(buf.Bytes())) - if err := rlp.Decode(buf, &envelope); err != nil { - return nil, err - } - - envelope.hash = h - - return &envelope, nil -} - func (self *Envelope) Hash() Hash { if self.hash == EmptyHash { self.hash = H(crypto.Sha3(ethutil.Encode(self))) @@ -126,3 +108,27 @@ func (self *Envelope) withoutNonce() interface{} { func (self *Envelope) RlpData() interface{} { return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce} } + +func (self *Envelope) DecodeRLP(s *rlp.Stream) error { + var extenv struct { + Expiry uint32 + Ttl uint32 + Topics [][]byte + Data []byte + Nonce uint32 + } + if err := s.Decode(&extenv); err != nil { + return err + } + + self.Expiry = extenv.Expiry + self.Ttl = extenv.Ttl + self.Topics = extenv.Topics + self.Data = extenv.Data + self.Nonce = extenv.Nonce + + // TODO We should use the stream directly here. + self.hash = H(crypto.Sha3(ethutil.Encode(self))) + + return nil +} diff --git a/whisper/message.go b/whisper/message.go index db0110b4a..bbad8e6a3 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -67,7 +67,11 @@ func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { } } - envelope := NewEnvelope(DefaultTtl, opts.Topics, self) + if opts.Ttl == 0 { + opts.Ttl = DefaultTtl + } + + envelope := NewEnvelope(opts.Ttl, opts.Topics, self) envelope.Seal(pow) return envelope, nil diff --git a/whisper/whisper.go b/whisper/whisper.go index bdc69f199..ece2dd6d4 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -126,18 +126,20 @@ func (self *Whisper) Watch(opts Filter) int { }) } -func (self *Whisper) Trigger(id int) { +func (self *Whisper) Messages(id int) (messages []*Message) { filter := self.filters.Get(id) if filter != nil { for _, e := range self.messages { if msg, key := self.open(e); msg != nil { f := createFilter(msg, e.Topics, key) if self.filters.Match(filter, f) { - self.filters.Notify(f, msg) + messages = append(messages, msg) } } } } + + return } // Main handler for passing whisper messages to whisper peer objects @@ -158,17 +160,19 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { return err } - envelope, err := NewEnvelopeFromReader(msg.Payload) - if err != nil { + var envelopes []*Envelope + if err := msg.Decode(&envelopes); err != nil { peer.Infoln(err) continue } - if err := self.add(envelope); err != nil { - // TODO Punish peer here. Invalid envelope. - peer.Infoln(err) + for _, envelope := range envelopes { + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Infoln(err) + } + wpeer.addKnown(envelope) } - wpeer.addKnown(envelope) } } @@ -192,6 +196,8 @@ func (self *Whisper) add(envelope *Envelope) error { go self.postEvent(envelope) } + wlogger.DebugDetailln("added whisper message") + return nil } -- cgit