aboutsummaryrefslogtreecommitdiffstats
path: root/whisper
diff options
context:
space:
mode:
Diffstat (limited to 'whisper')
-rw-r--r--whisper/mailserver/mailserver.go2
-rw-r--r--whisper/mailserver/server_test.go2
-rw-r--r--whisper/whisperv2/whisper.go4
-rw-r--r--whisper/whisperv5/api.go2
-rw-r--r--whisper/whisperv5/filter.go8
-rw-r--r--whisper/whisperv5/filter_test.go48
-rw-r--r--whisper/whisperv5/gen_criteria_json.go14
-rw-r--r--whisper/whisperv5/gen_message_json.go28
-rw-r--r--whisper/whisperv5/gen_newmessage_json.go26
-rw-r--r--whisper/whisperv6/api.go20
-rw-r--r--whisper/whisperv6/config.go2
-rw-r--r--whisper/whisperv6/doc.go35
-rw-r--r--whisper/whisperv6/envelope.go62
-rw-r--r--whisper/whisperv6/filter.go29
-rw-r--r--whisper/whisperv6/filter_test.go50
-rw-r--r--whisper/whisperv6/gen_criteria_json.go16
-rw-r--r--whisper/whisperv6/gen_message_json.go30
-rw-r--r--whisper/whisperv6/gen_newmessage_json.go28
-rw-r--r--whisper/whisperv6/message.go214
-rw-r--r--whisper/whisperv6/message_test.go57
-rw-r--r--whisper/whisperv6/peer.go115
-rw-r--r--whisper/whisperv6/peer_test.go251
-rw-r--r--whisper/whisperv6/topic.go4
-rw-r--r--whisper/whisperv6/whisper.go633
-rw-r--r--whisper/whisperv6/whisper_test.go79
25 files changed, 1159 insertions, 600 deletions
diff --git a/whisper/mailserver/mailserver.go b/whisper/mailserver/mailserver.go
index 0ec6ec570..6555fd5c0 100644
--- a/whisper/mailserver/mailserver.go
+++ b/whisper/mailserver/mailserver.go
@@ -26,7 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
- whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
+ whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
diff --git a/whisper/mailserver/server_test.go b/whisper/mailserver/server_test.go
index 9155ee85a..c8e0a553a 100644
--- a/whisper/mailserver/server_test.go
+++ b/whisper/mailserver/server_test.go
@@ -26,7 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
- whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
+ whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
)
const powRequirement = 0.00001
diff --git a/whisper/whisperv2/whisper.go b/whisper/whisperv2/whisper.go
index 61c36918d..e111a3414 100644
--- a/whisper/whisperv2/whisper.go
+++ b/whisper/whisperv2/whisper.go
@@ -262,7 +262,7 @@ func (self *Whisper) add(envelope *Envelope) error {
// Insert the message into the tracked pool
hash := envelope.Hash()
if _, ok := self.messages[hash]; ok {
- log.Trace(fmt.Sprintf("whisper envelope already cached: %x\n", envelope))
+ log.Trace(fmt.Sprintf("whisper envelope already cached: %x\n", hash))
return nil
}
self.messages[hash] = envelope
@@ -277,7 +277,7 @@ func (self *Whisper) add(envelope *Envelope) error {
// Notify the local node of a message arrival
go self.postEvent(envelope)
}
- log.Trace(fmt.Sprintf("cached whisper envelope %x\n", envelope))
+ log.Trace(fmt.Sprintf("cached whisper envelope %x\n", hash))
return nil
}
diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go
index 96c4b0e6c..b4494d0d6 100644
--- a/whisper/whisperv5/api.go
+++ b/whisper/whisperv5/api.go
@@ -562,7 +562,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
}
if len(req.Topics) > 0 {
- topics = make([][]byte, 1)
+ topics = make([][]byte, 0, len(req.Topics))
for _, topic := range req.Topics {
topics = append(topics, topic[:])
}
diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go
index b5e893e0f..3190334eb 100644
--- a/whisper/whisperv5/filter.go
+++ b/whisper/whisperv5/filter.go
@@ -216,8 +216,12 @@ func (f *Filter) MatchTopic(topic TopicType) bool {
}
func matchSingleTopic(topic TopicType, bt []byte) bool {
- if len(bt) > 4 {
- bt = bt[:4]
+ if len(bt) > TopicLength {
+ bt = bt[:TopicLength]
+ }
+
+ if len(bt) < TopicLength {
+ return false
}
for j, b := range bt {
diff --git a/whisper/whisperv5/filter_test.go b/whisper/whisperv5/filter_test.go
index bd35e7f20..01034a351 100644
--- a/whisper/whisperv5/filter_test.go
+++ b/whisper/whisperv5/filter_test.go
@@ -88,7 +88,7 @@ func generateTestCases(t *testing.T, SizeTestFilters int) []FilterTestCase {
for i := 0; i < SizeTestFilters; i++ {
f, _ := generateFilter(t, true)
cases[i].f = f
- cases[i].alive = (mrand.Int()&int(1) == 0)
+ cases[i].alive = mrand.Int()&int(1) == 0
}
return cases
}
@@ -122,7 +122,7 @@ func TestInstallFilters(t *testing.T) {
for i, testCase := range tst {
fil := filters.Get(testCase.id)
- exist := (fil != nil)
+ exist := fil != nil
if exist != testCase.alive {
t.Fatalf("seed %d: failed alive: %d, %v, %v", seed, i, exist, testCase.alive)
}
@@ -776,6 +776,7 @@ func TestWatchers(t *testing.T) {
func TestVariableTopics(t *testing.T) {
InitSingleTest()
+ const lastTopicByte = 3
var match bool
params, err := generateMessageParams()
if err != nil {
@@ -796,19 +797,52 @@ func TestVariableTopics(t *testing.T) {
}
for i := 0; i < 4; i++ {
- arr := make([]byte, i+1, 4)
- copy(arr, env.Topic[:i+1])
-
- f.Topics[4] = arr
+ env.Topic = BytesToTopic(f.Topics[i])
match = f.MatchEnvelope(env)
if !match {
t.Fatalf("failed MatchEnvelope symmetric with seed %d, step %d.", seed, i)
}
- f.Topics[4][i]++
+ f.Topics[i][lastTopicByte]++
match = f.MatchEnvelope(env)
if match {
t.Fatalf("MatchEnvelope symmetric with seed %d, step %d: false positive.", seed, i)
}
}
}
+
+func TestMatchSingleTopic_ReturnTrue(t *testing.T) {
+ bt := []byte("test")
+ topic := BytesToTopic(bt)
+
+ if !matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
+
+func TestMatchSingleTopic_WithTail_ReturnTrue(t *testing.T) {
+ bt := []byte("test with tail")
+ topic := BytesToTopic([]byte("test"))
+
+ if !matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
+
+func TestMatchSingleTopic_NotEquals_ReturnFalse(t *testing.T) {
+ bt := []byte("tes")
+ topic := BytesToTopic(bt)
+
+ if matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
+
+func TestMatchSingleTopic_InsufficientLength_ReturnFalse(t *testing.T) {
+ bt := []byte("test")
+ topic := BytesToTopic([]byte("not_equal"))
+
+ if matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
diff --git a/whisper/whisperv5/gen_criteria_json.go b/whisper/whisperv5/gen_criteria_json.go
index df0de85df..1c0e389ad 100644
--- a/whisper/whisperv5/gen_criteria_json.go
+++ b/whisper/whisperv5/gen_criteria_json.go
@@ -31,12 +31,12 @@ func (c Criteria) MarshalJSON() ([]byte, error) {
func (c *Criteria) UnmarshalJSON(input []byte) error {
type Criteria struct {
- SymKeyID *string `json:"symKeyID"`
- PrivateKeyID *string `json:"privateKeyID"`
- Sig hexutil.Bytes `json:"sig"`
- MinPow *float64 `json:"minPow"`
- Topics []TopicType `json:"topics"`
- AllowP2P *bool `json:"allowP2P"`
+ SymKeyID *string `json:"symKeyID"`
+ PrivateKeyID *string `json:"privateKeyID"`
+ Sig *hexutil.Bytes `json:"sig"`
+ MinPow *float64 `json:"minPow"`
+ Topics []TopicType `json:"topics"`
+ AllowP2P *bool `json:"allowP2P"`
}
var dec Criteria
if err := json.Unmarshal(input, &dec); err != nil {
@@ -49,7 +49,7 @@ func (c *Criteria) UnmarshalJSON(input []byte) error {
c.PrivateKeyID = *dec.PrivateKeyID
}
if dec.Sig != nil {
- c.Sig = dec.Sig
+ c.Sig = *dec.Sig
}
if dec.MinPow != nil {
c.MinPow = *dec.MinPow
diff --git a/whisper/whisperv5/gen_message_json.go b/whisper/whisperv5/gen_message_json.go
index 185557331..b4c4274d0 100644
--- a/whisper/whisperv5/gen_message_json.go
+++ b/whisper/whisperv5/gen_message_json.go
@@ -37,22 +37,22 @@ func (m Message) MarshalJSON() ([]byte, error) {
func (m *Message) UnmarshalJSON(input []byte) error {
type Message struct {
- Sig hexutil.Bytes `json:"sig,omitempty"`
- TTL *uint32 `json:"ttl"`
- Timestamp *uint32 `json:"timestamp"`
- Topic *TopicType `json:"topic"`
- Payload hexutil.Bytes `json:"payload"`
- Padding hexutil.Bytes `json:"padding"`
- PoW *float64 `json:"pow"`
- Hash hexutil.Bytes `json:"hash"`
- Dst hexutil.Bytes `json:"recipientPublicKey,omitempty"`
+ Sig *hexutil.Bytes `json:"sig,omitempty"`
+ TTL *uint32 `json:"ttl"`
+ Timestamp *uint32 `json:"timestamp"`
+ Topic *TopicType `json:"topic"`
+ Payload *hexutil.Bytes `json:"payload"`
+ Padding *hexutil.Bytes `json:"padding"`
+ PoW *float64 `json:"pow"`
+ Hash *hexutil.Bytes `json:"hash"`
+ Dst *hexutil.Bytes `json:"recipientPublicKey,omitempty"`
}
var dec Message
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.Sig != nil {
- m.Sig = dec.Sig
+ m.Sig = *dec.Sig
}
if dec.TTL != nil {
m.TTL = *dec.TTL
@@ -64,19 +64,19 @@ func (m *Message) UnmarshalJSON(input []byte) error {
m.Topic = *dec.Topic
}
if dec.Payload != nil {
- m.Payload = dec.Payload
+ m.Payload = *dec.Payload
}
if dec.Padding != nil {
- m.Padding = dec.Padding
+ m.Padding = *dec.Padding
}
if dec.PoW != nil {
m.PoW = *dec.PoW
}
if dec.Hash != nil {
- m.Hash = dec.Hash
+ m.Hash = *dec.Hash
}
if dec.Dst != nil {
- m.Dst = dec.Dst
+ m.Dst = *dec.Dst
}
return nil
}
diff --git a/whisper/whisperv5/gen_newmessage_json.go b/whisper/whisperv5/gen_newmessage_json.go
index d0a47185e..97ffb64ad 100644
--- a/whisper/whisperv5/gen_newmessage_json.go
+++ b/whisper/whisperv5/gen_newmessage_json.go
@@ -39,16 +39,16 @@ func (n NewMessage) MarshalJSON() ([]byte, error) {
func (n *NewMessage) UnmarshalJSON(input []byte) error {
type NewMessage struct {
- SymKeyID *string `json:"symKeyID"`
- PublicKey hexutil.Bytes `json:"pubKey"`
- Sig *string `json:"sig"`
- TTL *uint32 `json:"ttl"`
- Topic *TopicType `json:"topic"`
- Payload hexutil.Bytes `json:"payload"`
- Padding hexutil.Bytes `json:"padding"`
- PowTime *uint32 `json:"powTime"`
- PowTarget *float64 `json:"powTarget"`
- TargetPeer *string `json:"targetPeer"`
+ SymKeyID *string `json:"symKeyID"`
+ PublicKey *hexutil.Bytes `json:"pubKey"`
+ Sig *string `json:"sig"`
+ TTL *uint32 `json:"ttl"`
+ Topic *TopicType `json:"topic"`
+ Payload *hexutil.Bytes `json:"payload"`
+ Padding *hexutil.Bytes `json:"padding"`
+ PowTime *uint32 `json:"powTime"`
+ PowTarget *float64 `json:"powTarget"`
+ TargetPeer *string `json:"targetPeer"`
}
var dec NewMessage
if err := json.Unmarshal(input, &dec); err != nil {
@@ -58,7 +58,7 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error {
n.SymKeyID = *dec.SymKeyID
}
if dec.PublicKey != nil {
- n.PublicKey = dec.PublicKey
+ n.PublicKey = *dec.PublicKey
}
if dec.Sig != nil {
n.Sig = *dec.Sig
@@ -70,10 +70,10 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error {
n.Topic = *dec.Topic
}
if dec.Payload != nil {
- n.Payload = dec.Payload
+ n.Payload = *dec.Payload
}
if dec.Padding != nil {
- n.Padding = dec.Padding
+ n.Padding = *dec.Padding
}
if dec.PowTime != nil {
n.PowTime = *dec.PowTime
diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go
index 3dddb6953..a2c75a41c 100644
--- a/whisper/whisperv6/api.go
+++ b/whisper/whisperv6/api.go
@@ -36,6 +36,7 @@ const (
filterTimeout = 300 // filters are considered timeout out after filterTimeout seconds
)
+// List of errors
var (
ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key")
ErrInvalidSymmetricKey = errors.New("invalid symmetric key")
@@ -116,12 +117,17 @@ func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32)
return true, api.w.SetMaxMessageSize(size)
}
-// SetMinPow sets the minimum PoW for a message before it is accepted.
+// SetMinPoW sets the minimum PoW, and notifies the peers.
func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) {
return true, api.w.SetMinimumPoW(pow)
}
-// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages.
+// SetBloomFilter sets the new value of bloom filter, and notifies the peers.
+func (api *PublicWhisperAPI) SetBloomFilter(ctx context.Context, bloom hexutil.Bytes) (bool, error) {
+ return true, api.w.SetBloomFilter(bloom)
+}
+
+// MarkTrustedPeer marks a peer trusted, which will allow it to send historic (expired) messages.
// Note: This function is not adding new nodes, the node needs to exists as a peer.
func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) {
n, err := discover.ParseNode(enode)
@@ -169,7 +175,7 @@ func (api *PublicWhisperAPI) GetPublicKey(ctx context.Context, id string) (hexut
return crypto.FromECDSAPub(&key.PublicKey), nil
}
-// GetPublicKey returns the private key associated with the given key. The key is the hex
+// GetPrivateKey returns the private key associated with the given key. The key is the hex
// encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62.
func (api *PublicWhisperAPI) GetPrivateKey(ctx context.Context, id string) (hexutil.Bytes, error) {
key, err := api.w.GetPrivateKey(id)
@@ -272,7 +278,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return false, err
}
- if !validateSymmetricKey(params.KeySym) {
+ if !validateDataIntegrity(params.KeySym, aesKeyLength) {
return false, ErrInvalidSymmetricKey
}
}
@@ -378,7 +384,7 @@ func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc.
if err != nil {
return nil, err
}
- if !validateSymmetricKey(key) {
+ if !validateDataIntegrity(key, aesKeyLength) {
return nil, ErrInvalidSymmetricKey
}
filter.KeySym = key
@@ -550,7 +556,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
if keySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return "", err
}
- if !validateSymmetricKey(keySym) {
+ if !validateDataIntegrity(keySym, aesKeyLength) {
return "", ErrInvalidSymmetricKey
}
}
@@ -562,7 +568,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
}
if len(req.Topics) > 0 {
- topics = make([][]byte, 1)
+ topics = make([][]byte, 0, len(req.Topics))
for _, topic := range req.Topics {
topics = append(topics, topic[:])
}
diff --git a/whisper/whisperv6/config.go b/whisper/whisperv6/config.go
index d7f817aa2..61419de00 100644
--- a/whisper/whisperv6/config.go
+++ b/whisper/whisperv6/config.go
@@ -16,11 +16,13 @@
package whisperv6
+// Config represents the configuration state of a whisper node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"`
}
+// DefaultConfig represents (shocker!) the default configuration.
var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize,
MinimumAcceptedPOW: DefaultMinimumPoW,
diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go
index 2a4911d65..d5d7fed60 100644
--- a/whisper/whisperv6/doc.go
+++ b/whisper/whisperv6/doc.go
@@ -27,6 +27,9 @@ Whisper is a pure identity-based messaging system. Whisper provides a low-level
or prejudiced by the low-level hardware attributes and characteristics,
particularly the notion of singular endpoints.
*/
+
+// Contains the Whisper protocol constant definitions
+
package whisperv6
import (
@@ -34,11 +37,11 @@ import (
"time"
)
+// Whisper protocol parameters
const (
- EnvelopeVersion = uint64(0)
- ProtocolVersion = uint64(6)
- ProtocolVersionStr = "6.0"
- ProtocolName = "shh"
+ ProtocolVersion = uint64(6) // Protocol version number
+ ProtocolVersionStr = "6.0" // The same, as a string
+ ProtocolName = "shh" // Nickname of the protocol in geth
// whisper protocol message codes, according to EIP-627
statusCode = 0 // used by whisper protocol
@@ -49,29 +52,31 @@ const (
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128
- paddingMask = byte(3)
+ SizeMask = byte(3) // mask used to extract the size of payload size field from the flags
signatureFlag = byte(4)
- TopicLength = 4
- signatureLength = 65
- aesKeyLength = 32
- AESNonceLength = 12
- keyIdSize = 32
+ TopicLength = 4 // in bytes
+ signatureLength = 65 // in bytes
+ aesKeyLength = 32 // in bytes
+ aesNonceLength = 12 // in bytes; for more info please see cipher.gcmStandardNonceSize & aesgcm.NonceSize()
+ keyIDSize = 32 // in bytes
+ bloomFilterSize = 64 // in bytes
+ flagsLength = 1
+
+ EnvelopeHeaderLength = 20
MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message.
DefaultMaxMessageSize = uint32(1024 * 1024)
DefaultMinimumPoW = 0.2
- padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol (must not exceed 2^24)
+ padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol
messageQueueLimit = 1024
expirationCycle = time.Second
transmissionCycle = 300 * time.Millisecond
- DefaultTTL = 50 // seconds
- SynchAllowance = 10 // seconds
-
- EnvelopeHeaderLength = 20
+ DefaultTTL = 50 // seconds
+ DefaultSyncAllowance = 10 // seconds
)
type unknownVersionError uint64
diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go
index 676df669b..c7bea2bb9 100644
--- a/whisper/whisperv6/envelope.go
+++ b/whisper/whisperv6/envelope.go
@@ -42,9 +42,11 @@ type Envelope struct {
Data []byte
Nonce uint64
- pow float64 // Message-specific PoW as described in the Whisper specification.
- hash common.Hash // Cached hash of the envelope to avoid rehashing every time.
- // Don't access hash directly, use Hash() function instead.
+ pow float64 // Message-specific PoW as described in the Whisper specification.
+
+ // the following variables should not be accessed directly, use the corresponding function instead: Hash(), Bloom()
+ hash common.Hash // Cached hash of the envelope to avoid rehashing every time.
+ bloom []byte
}
// size returns the size of envelope as it is sent (i.e. public fields only)
@@ -75,15 +77,19 @@ func NewEnvelope(ttl uint32, topic TopicType, msg *sentMessage) *Envelope {
// Seal closes the envelope by spending the requested amount of time as a proof
// of work on hashing the data.
func (e *Envelope) Seal(options *MessageParams) error {
- var target, bestBit int
if options.PoW == 0 {
- // adjust for the duration of Seal() execution only if execution time is predefined unconditionally
+ // PoW is not required
+ return nil
+ }
+
+ var target, bestBit int
+ if options.PoW < 0 {
+ // target is not set - the function should run for a period
+ // of time specified in WorkTime param. Since we can predict
+ // the execution time, we can also adjust Expiry.
e.Expiry += options.WorkTime
} else {
target = e.powToFirstBit(options.PoW)
- if target < 1 {
- target = 1
- }
}
buf := make([]byte, 64)
@@ -113,6 +119,8 @@ func (e *Envelope) Seal(options *MessageParams) error {
return nil
}
+// PoW computes (if necessary) and returns the proof of work target
+// of the envelope.
func (e *Envelope) PoW() float64 {
if e.pow == 0 {
e.calculatePoW(0)
@@ -139,7 +147,11 @@ func (e *Envelope) powToFirstBit(pow float64) int {
x *= float64(e.TTL)
bits := gmath.Log2(x)
bits = gmath.Ceil(bits)
- return int(bits)
+ res := int(bits)
+ if res < 1 {
+ res = 1
+ }
+ return res
}
// Hash returns the SHA3 hash of the envelope, calculating it if not yet done.
@@ -196,8 +208,7 @@ func (e *Envelope) OpenSymmetric(key []byte) (msg *ReceivedMessage, err error) {
// Open tries to decrypt an envelope, and populates the message fields in case of success.
func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
- // The API interface forbids filters doing both symmetric and
- // asymmetric encryption.
+ // The API interface forbids filters doing both symmetric and asymmetric encryption.
if watcher.expectsAsymmetricEncryption() && watcher.expectsSymmetricEncryption() {
return nil
}
@@ -215,7 +226,7 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
}
if msg != nil {
- ok := msg.Validate()
+ ok := msg.ValidateAndParse()
if !ok {
return nil
}
@@ -227,3 +238,30 @@ func (e *Envelope) Open(watcher *Filter) (msg *ReceivedMessage) {
}
return msg
}
+
+// Bloom maps 4-bytes Topic into 64-byte bloom filter with 3 bits set (at most).
+func (e *Envelope) Bloom() []byte {
+ if e.bloom == nil {
+ e.bloom = TopicToBloom(e.Topic)
+ }
+ return e.bloom
+}
+
+// TopicToBloom converts the topic (4 bytes) to the bloom filter (64 bytes)
+func TopicToBloom(topic TopicType) []byte {
+ b := make([]byte, bloomFilterSize)
+ var index [3]int
+ for j := 0; j < 3; j++ {
+ index[j] = int(topic[j])
+ if (topic[3] & (1 << uint(j))) != 0 {
+ index[j] += 256
+ }
+ }
+
+ for j := 0; j < 3; j++ {
+ byteIndex := index[j] / 8
+ bitIndex := index[j] % 8
+ b[byteIndex] = (1 << uint(bitIndex))
+ }
+ return b
+}
diff --git a/whisper/whisperv6/filter.go b/whisper/whisperv6/filter.go
index 2f52dd6b9..eb0c65fa3 100644
--- a/whisper/whisperv6/filter.go
+++ b/whisper/whisperv6/filter.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
)
+// Filter represents a Whisper message filter
type Filter struct {
Src *ecdsa.PublicKey // Sender of the message
KeyAsym *ecdsa.PrivateKey // Private Key of recipient
@@ -39,12 +40,14 @@ type Filter struct {
mutex sync.RWMutex
}
+// Filters represents a collection of filters
type Filters struct {
watchers map[string]*Filter
whisper *Whisper
mutex sync.RWMutex
}
+// NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters {
return &Filters{
watchers: make(map[string]*Filter),
@@ -52,6 +55,7 @@ func NewFilters(w *Whisper) *Filters {
}
}
+// Install will add a new filter to the filter collection
func (fs *Filters) Install(watcher *Filter) (string, error) {
if watcher.KeySym != nil && watcher.KeyAsym != nil {
return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys")
@@ -81,6 +85,8 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
return id, err
}
+// Uninstall will remove a filter whose id has been specified from
+// the filter collection
func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock()
defer fs.mutex.Unlock()
@@ -91,12 +97,15 @@ func (fs *Filters) Uninstall(id string) bool {
return false
}
+// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
return fs.watchers[id]
}
+// NotifyWatchers notifies any filter that has declared interest
+// for the envelope's topic.
func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
var msg *ReceivedMessage
@@ -140,9 +149,9 @@ func (f *Filter) processEnvelope(env *Envelope) *ReceivedMessage {
msg := env.Open(f)
if msg != nil {
return msg
- } else {
- log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex())
}
+
+ log.Trace("processing envelope: failed to open", "hash", env.Hash().Hex())
} else {
log.Trace("processing envelope: does not match", "hash", env.Hash().Hex())
}
@@ -157,6 +166,8 @@ func (f *Filter) expectsSymmetricEncryption() bool {
return f.KeySym != nil
}
+// Trigger adds a yet-unknown message to the filter's list of
+// received messages.
func (f *Filter) Trigger(msg *ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
@@ -166,6 +177,8 @@ func (f *Filter) Trigger(msg *ReceivedMessage) {
}
}
+// Retrieve will return the list of all received messages associated
+// to a filter.
func (f *Filter) Retrieve() (all []*ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
@@ -195,7 +208,7 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
return false
}
-// MatchEvelope checks if it's worth decrypting the message. If
+// MatchEnvelope checks if it's worth decrypting the message. If
// it returns `true`, client code is expected to attempt decrypting
// the message and subsequently call MatchMessage.
func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
@@ -206,6 +219,7 @@ func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
return f.MatchTopic(envelope.Topic)
}
+// MatchTopic checks that the filter captures a given topic.
func (f *Filter) MatchTopic(topic TopicType) bool {
if len(f.Topics) == 0 {
// any topic matches
@@ -221,8 +235,12 @@ func (f *Filter) MatchTopic(topic TopicType) bool {
}
func matchSingleTopic(topic TopicType, bt []byte) bool {
- if len(bt) > 4 {
- bt = bt[:4]
+ if len(bt) > TopicLength {
+ bt = bt[:TopicLength]
+ }
+
+ if len(bt) < TopicLength {
+ return false
}
for j, b := range bt {
@@ -233,6 +251,7 @@ func matchSingleTopic(topic TopicType, bt []byte) bool {
return true
}
+// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false
diff --git a/whisper/whisperv6/filter_test.go b/whisper/whisperv6/filter_test.go
index dd0de0f6e..e7230ef38 100644
--- a/whisper/whisperv6/filter_test.go
+++ b/whisper/whisperv6/filter_test.go
@@ -88,7 +88,7 @@ func generateTestCases(t *testing.T, SizeTestFilters int) []FilterTestCase {
for i := 0; i < SizeTestFilters; i++ {
f, _ := generateFilter(t, true)
cases[i].f = f
- cases[i].alive = (mrand.Int()&int(1) == 0)
+ cases[i].alive = mrand.Int()&int(1) == 0
}
return cases
}
@@ -109,7 +109,7 @@ func TestInstallFilters(t *testing.T) {
t.Fatalf("seed %d: failed to install filter: %s", seed, err)
}
tst[i].id = j
- if len(j) != keyIdSize*2 {
+ if len(j) != keyIDSize*2 {
t.Fatalf("seed %d: wrong filter id size [%d]", seed, len(j))
}
}
@@ -122,7 +122,7 @@ func TestInstallFilters(t *testing.T) {
for i, testCase := range tst {
fil := filters.Get(testCase.id)
- exist := (fil != nil)
+ exist := fil != nil
if exist != testCase.alive {
t.Fatalf("seed %d: failed alive: %d, %v, %v", seed, i, exist, testCase.alive)
}
@@ -800,6 +800,7 @@ func TestWatchers(t *testing.T) {
func TestVariableTopics(t *testing.T) {
InitSingleTest()
+ const lastTopicByte = 3
var match bool
params, err := generateMessageParams()
if err != nil {
@@ -820,19 +821,52 @@ func TestVariableTopics(t *testing.T) {
}
for i := 0; i < 4; i++ {
- arr := make([]byte, i+1, 4)
- copy(arr, env.Topic[:i+1])
-
- f.Topics[4] = arr
+ env.Topic = BytesToTopic(f.Topics[i])
match = f.MatchEnvelope(env)
if !match {
t.Fatalf("failed MatchEnvelope symmetric with seed %d, step %d.", seed, i)
}
- f.Topics[4][i]++
+ f.Topics[i][lastTopicByte]++
match = f.MatchEnvelope(env)
if match {
t.Fatalf("MatchEnvelope symmetric with seed %d, step %d: false positive.", seed, i)
}
}
}
+
+func TestMatchSingleTopic_ReturnTrue(t *testing.T) {
+ bt := []byte("test")
+ topic := BytesToTopic(bt)
+
+ if !matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
+
+func TestMatchSingleTopic_WithTail_ReturnTrue(t *testing.T) {
+ bt := []byte("test with tail")
+ topic := BytesToTopic([]byte("test"))
+
+ if !matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
+
+func TestMatchSingleTopic_NotEquals_ReturnFalse(t *testing.T) {
+ bt := []byte("tes")
+ topic := BytesToTopic(bt)
+
+ if matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
+
+func TestMatchSingleTopic_InsufficientLength_ReturnFalse(t *testing.T) {
+ bt := []byte("test")
+ topic := BytesToTopic([]byte("not_equal"))
+
+ if matchSingleTopic(topic, bt) {
+ t.FailNow()
+ }
+}
diff --git a/whisper/whisperv6/gen_criteria_json.go b/whisper/whisperv6/gen_criteria_json.go
index 52a4d3cb6..1a428d6df 100644
--- a/whisper/whisperv6/gen_criteria_json.go
+++ b/whisper/whisperv6/gen_criteria_json.go
@@ -10,6 +10,7 @@ import (
var _ = (*criteriaOverride)(nil)
+// MarshalJSON marshals type Criteria to a json string
func (c Criteria) MarshalJSON() ([]byte, error) {
type Criteria struct {
SymKeyID string `json:"symKeyID"`
@@ -29,14 +30,15 @@ func (c Criteria) MarshalJSON() ([]byte, error) {
return json.Marshal(&enc)
}
+// UnmarshalJSON unmarshals type Criteria to a json string
func (c *Criteria) UnmarshalJSON(input []byte) error {
type Criteria struct {
- SymKeyID *string `json:"symKeyID"`
- PrivateKeyID *string `json:"privateKeyID"`
- Sig hexutil.Bytes `json:"sig"`
- MinPow *float64 `json:"minPow"`
- Topics []TopicType `json:"topics"`
- AllowP2P *bool `json:"allowP2P"`
+ SymKeyID *string `json:"symKeyID"`
+ PrivateKeyID *string `json:"privateKeyID"`
+ Sig *hexutil.Bytes `json:"sig"`
+ MinPow *float64 `json:"minPow"`
+ Topics []TopicType `json:"topics"`
+ AllowP2P *bool `json:"allowP2P"`
}
var dec Criteria
if err := json.Unmarshal(input, &dec); err != nil {
@@ -49,7 +51,7 @@ func (c *Criteria) UnmarshalJSON(input []byte) error {
c.PrivateKeyID = *dec.PrivateKeyID
}
if dec.Sig != nil {
- c.Sig = dec.Sig
+ c.Sig = *dec.Sig
}
if dec.MinPow != nil {
c.MinPow = *dec.MinPow
diff --git a/whisper/whisperv6/gen_message_json.go b/whisper/whisperv6/gen_message_json.go
index 27b46752b..6218f5df6 100644
--- a/whisper/whisperv6/gen_message_json.go
+++ b/whisper/whisperv6/gen_message_json.go
@@ -10,6 +10,7 @@ import (
var _ = (*messageOverride)(nil)
+// MarshalJSON marshals type Message to a json string
func (m Message) MarshalJSON() ([]byte, error) {
type Message struct {
Sig hexutil.Bytes `json:"sig,omitempty"`
@@ -35,24 +36,25 @@ func (m Message) MarshalJSON() ([]byte, error) {
return json.Marshal(&enc)
}
+// UnmarshalJSON unmarshals type Message to a json string
func (m *Message) UnmarshalJSON(input []byte) error {
type Message struct {
- Sig hexutil.Bytes `json:"sig,omitempty"`
- TTL *uint32 `json:"ttl"`
- Timestamp *uint32 `json:"timestamp"`
- Topic *TopicType `json:"topic"`
- Payload hexutil.Bytes `json:"payload"`
- Padding hexutil.Bytes `json:"padding"`
- PoW *float64 `json:"pow"`
- Hash hexutil.Bytes `json:"hash"`
- Dst hexutil.Bytes `json:"recipientPublicKey,omitempty"`
+ Sig *hexutil.Bytes `json:"sig,omitempty"`
+ TTL *uint32 `json:"ttl"`
+ Timestamp *uint32 `json:"timestamp"`
+ Topic *TopicType `json:"topic"`
+ Payload *hexutil.Bytes `json:"payload"`
+ Padding *hexutil.Bytes `json:"padding"`
+ PoW *float64 `json:"pow"`
+ Hash *hexutil.Bytes `json:"hash"`
+ Dst *hexutil.Bytes `json:"recipientPublicKey,omitempty"`
}
var dec Message
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.Sig != nil {
- m.Sig = dec.Sig
+ m.Sig = *dec.Sig
}
if dec.TTL != nil {
m.TTL = *dec.TTL
@@ -64,19 +66,19 @@ func (m *Message) UnmarshalJSON(input []byte) error {
m.Topic = *dec.Topic
}
if dec.Payload != nil {
- m.Payload = dec.Payload
+ m.Payload = *dec.Payload
}
if dec.Padding != nil {
- m.Padding = dec.Padding
+ m.Padding = *dec.Padding
}
if dec.PoW != nil {
m.PoW = *dec.PoW
}
if dec.Hash != nil {
- m.Hash = dec.Hash
+ m.Hash = *dec.Hash
}
if dec.Dst != nil {
- m.Dst = dec.Dst
+ m.Dst = *dec.Dst
}
return nil
}
diff --git a/whisper/whisperv6/gen_newmessage_json.go b/whisper/whisperv6/gen_newmessage_json.go
index d16011a57..75a1279ae 100644
--- a/whisper/whisperv6/gen_newmessage_json.go
+++ b/whisper/whisperv6/gen_newmessage_json.go
@@ -10,6 +10,7 @@ import (
var _ = (*newMessageOverride)(nil)
+// MarshalJSON marshals type NewMessage to a json string
func (n NewMessage) MarshalJSON() ([]byte, error) {
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
@@ -37,18 +38,19 @@ func (n NewMessage) MarshalJSON() ([]byte, error) {
return json.Marshal(&enc)
}
+// UnmarshalJSON unmarshals type NewMessage to a json string
func (n *NewMessage) UnmarshalJSON(input []byte) error {
type NewMessage struct {
- SymKeyID *string `json:"symKeyID"`
- PublicKey hexutil.Bytes `json:"pubKey"`
- Sig *string `json:"sig"`
- TTL *uint32 `json:"ttl"`
- Topic *TopicType `json:"topic"`
- Payload hexutil.Bytes `json:"payload"`
- Padding hexutil.Bytes `json:"padding"`
- PowTime *uint32 `json:"powTime"`
- PowTarget *float64 `json:"powTarget"`
- TargetPeer *string `json:"targetPeer"`
+ SymKeyID *string `json:"symKeyID"`
+ PublicKey *hexutil.Bytes `json:"pubKey"`
+ Sig *string `json:"sig"`
+ TTL *uint32 `json:"ttl"`
+ Topic *TopicType `json:"topic"`
+ Payload *hexutil.Bytes `json:"payload"`
+ Padding *hexutil.Bytes `json:"padding"`
+ PowTime *uint32 `json:"powTime"`
+ PowTarget *float64 `json:"powTarget"`
+ TargetPeer *string `json:"targetPeer"`
}
var dec NewMessage
if err := json.Unmarshal(input, &dec); err != nil {
@@ -58,7 +60,7 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error {
n.SymKeyID = *dec.SymKeyID
}
if dec.PublicKey != nil {
- n.PublicKey = dec.PublicKey
+ n.PublicKey = *dec.PublicKey
}
if dec.Sig != nil {
n.Sig = *dec.Sig
@@ -70,10 +72,10 @@ func (n *NewMessage) UnmarshalJSON(input []byte) error {
n.Topic = *dec.Topic
}
if dec.Payload != nil {
- n.Payload = dec.Payload
+ n.Payload = *dec.Payload
}
if dec.Padding != nil {
- n.Padding = dec.Padding
+ n.Padding = *dec.Padding
}
if dec.PowTime != nil {
n.PowTime = *dec.PowTime
diff --git a/whisper/whisperv6/message.go b/whisper/whisperv6/message.go
index f8df50336..b8318cbe8 100644
--- a/whisper/whisperv6/message.go
+++ b/whisper/whisperv6/message.go
@@ -25,6 +25,7 @@ import (
crand "crypto/rand"
"encoding/binary"
"errors"
+ mrand "math/rand"
"strconv"
"github.com/ethereum/go-ethereum/common"
@@ -33,7 +34,8 @@ import (
"github.com/ethereum/go-ethereum/log"
)
-// Options specifies the exact way a message should be wrapped into an Envelope.
+// MessageParams specifies the exact way a message should be wrapped
+// into an Envelope.
type MessageParams struct {
TTL uint32
Src *ecdsa.PrivateKey
@@ -54,7 +56,7 @@ type sentMessage struct {
}
// ReceivedMessage represents a data packet to be received through the
-// Whisper protocol.
+// Whisper protocol and successfully decrypted.
type ReceivedMessage struct {
Raw []byte
@@ -70,7 +72,7 @@ type ReceivedMessage struct {
Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message)
Topic TopicType
- SymKeyHash common.Hash // The Keccak256Hash of the key, associated with the Topic
+ SymKeyHash common.Hash // The Keccak256Hash of the key
EnvelopeHash common.Hash // Message envelope hash to act as a unique id
}
@@ -86,83 +88,62 @@ func (msg *ReceivedMessage) isAsymmetricEncryption() bool {
return msg.Dst != nil
}
-// NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
+// NewSentMessage creates and initializes a non-signed, non-encrypted Whisper message.
func NewSentMessage(params *MessageParams) (*sentMessage, error) {
+ const payloadSizeFieldMaxSize = 4
msg := sentMessage{}
- msg.Raw = make([]byte, 1, len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit)
+ msg.Raw = make([]byte, 1,
+ flagsLength+payloadSizeFieldMaxSize+len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit)
msg.Raw[0] = 0 // set all the flags to zero
- err := msg.appendPadding(params)
- if err != nil {
- return nil, err
- }
+ msg.addPayloadSizeField(params.Payload)
msg.Raw = append(msg.Raw, params.Payload...)
- return &msg, nil
+ err := msg.appendPadding(params)
+ return &msg, err
}
-// getSizeOfLength returns the number of bytes necessary to encode the entire size padding (including these bytes)
-func getSizeOfLength(b []byte) (sz int, err error) {
- sz = intSize(len(b)) // first iteration
- sz = intSize(len(b) + sz) // second iteration
- if sz > 3 {
- err = errors.New("oversized padding parameter")
- }
- return sz, err
+// addPayloadSizeField appends the auxiliary field containing the size of payload
+func (msg *sentMessage) addPayloadSizeField(payload []byte) {
+ fieldSize := getSizeOfPayloadSizeField(payload)
+ field := make([]byte, 4)
+ binary.LittleEndian.PutUint32(field, uint32(len(payload)))
+ field = field[:fieldSize]
+ msg.Raw = append(msg.Raw, field...)
+ msg.Raw[0] |= byte(fieldSize)
}
-// sizeOfIntSize returns minimal number of bytes necessary to encode an integer value
-func intSize(i int) (s int) {
- for s = 1; i >= 256; s++ {
- i /= 256
+// getSizeOfPayloadSizeField returns the number of bytes necessary to encode the size of payload
+func getSizeOfPayloadSizeField(payload []byte) int {
+ s := 1
+ for i := len(payload); i >= 256; i /= 256 {
+ s++
}
return s
}
-// appendPadding appends the pseudorandom padding bytes and sets the padding flag.
-// The last byte contains the size of padding (thus, its size must not exceed 256).
+// appendPadding appends the padding specified in params.
+// If no padding is provided in params, then random padding is generated.
func (msg *sentMessage) appendPadding(params *MessageParams) error {
- rawSize := len(params.Payload) + 1
- if params.Src != nil {
- rawSize += signatureLength
+ if len(params.Padding) != 0 {
+ // padding data was provided by the Dapp, just use it as is
+ msg.Raw = append(msg.Raw, params.Padding...)
+ return nil
}
- if params.KeySym != nil {
- rawSize += AESNonceLength
+ rawSize := flagsLength + getSizeOfPayloadSizeField(params.Payload) + len(params.Payload)
+ if params.Src != nil {
+ rawSize += signatureLength
}
odd := rawSize % padSizeLimit
-
- if len(params.Padding) != 0 {
- padSize := len(params.Padding)
- padLengthSize, err := getSizeOfLength(params.Padding)
- if err != nil {
- return err
- }
- totalPadSize := padSize + padLengthSize
- buf := make([]byte, 8)
- binary.LittleEndian.PutUint32(buf, uint32(totalPadSize))
- buf = buf[:padLengthSize]
- msg.Raw = append(msg.Raw, buf...)
- msg.Raw = append(msg.Raw, params.Padding...)
- msg.Raw[0] |= byte(padLengthSize) // number of bytes indicating the padding size
- } else if odd != 0 {
- totalPadSize := padSizeLimit - odd
- if totalPadSize > 255 {
- // this algorithm is only valid if padSizeLimit < 256.
- // if padSizeLimit will ever change, please fix the algorithm
- // (please see also ReceivedMessage.extractPadding() function).
- panic("please fix the padding algorithm before releasing new version")
- }
- buf := make([]byte, totalPadSize)
- _, err := crand.Read(buf[1:])
- if err != nil {
- return err
- }
- if totalPadSize > 6 && !validateSymmetricKey(buf) {
- return errors.New("failed to generate random padding of size " + strconv.Itoa(totalPadSize))
- }
- buf[0] = byte(totalPadSize)
- msg.Raw = append(msg.Raw, buf...)
- msg.Raw[0] |= byte(0x1) // number of bytes indicating the padding size
+ paddingSize := padSizeLimit - odd
+ pad := make([]byte, paddingSize)
+ _, err := crand.Read(pad)
+ if err != nil {
+ return err
}
+ if !validateDataIntegrity(pad, paddingSize) {
+ return errors.New("failed to generate random padding of size " + strconv.Itoa(paddingSize))
+ }
+ msg.Raw = append(msg.Raw, pad...)
return nil
}
@@ -175,11 +156,11 @@ func (msg *sentMessage) sign(key *ecdsa.PrivateKey) error {
return nil
}
- msg.Raw[0] |= signatureFlag
+ msg.Raw[0] |= signatureFlag // it is important to set this flag before signing
hash := crypto.Keccak256(msg.Raw)
signature, err := crypto.Sign(hash, key)
if err != nil {
- msg.Raw[0] &= ^signatureFlag // clear the flag
+ msg.Raw[0] &= (0xFF ^ signatureFlag) // clear the flag
return err
}
msg.Raw = append(msg.Raw, signature...)
@@ -201,10 +182,9 @@ func (msg *sentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error {
// encryptSymmetric encrypts a message with a topic key, using AES-GCM-256.
// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize).
func (msg *sentMessage) encryptSymmetric(key []byte) (err error) {
- if !validateSymmetricKey(key) {
- return errors.New("invalid key provided for symmetric encryption")
+ if !validateDataIntegrity(key, aesKeyLength) {
+ return errors.New("invalid key provided for symmetric encryption, size: " + strconv.Itoa(len(key)))
}
-
block, err := aes.NewCipher(key)
if err != nil {
return err
@@ -213,20 +193,46 @@ func (msg *sentMessage) encryptSymmetric(key []byte) (err error) {
if err != nil {
return err
}
-
- // never use more than 2^32 random nonces with a given key
- salt := make([]byte, aesgcm.NonceSize())
- _, err = crand.Read(salt)
+ salt, err := generateSecureRandomData(aesNonceLength) // never use more than 2^32 random nonces with a given key
if err != nil {
return err
- } else if !validateSymmetricKey(salt) {
- return errors.New("crypto/rand failed to generate salt")
}
-
- msg.Raw = append(aesgcm.Seal(nil, salt, msg.Raw, nil), salt...)
+ encrypted := aesgcm.Seal(nil, salt, msg.Raw, nil)
+ msg.Raw = append(encrypted, salt...)
return nil
}
+// generateSecureRandomData generates random data where extra security is required.
+// The purpose of this function is to prevent some bugs in software or in hardware
+// from delivering not-very-random data. This is especially useful for AES nonce,
+// where true randomness does not really matter, but it is very important to have
+// a unique nonce for every message.
+func generateSecureRandomData(length int) ([]byte, error) {
+ x := make([]byte, length)
+ y := make([]byte, length)
+ res := make([]byte, length)
+
+ _, err := crand.Read(x)
+ if err != nil {
+ return nil, err
+ } else if !validateDataIntegrity(x, length) {
+ return nil, errors.New("crypto/rand failed to generate secure random data")
+ }
+ _, err = mrand.Read(y)
+ if err != nil {
+ return nil, err
+ } else if !validateDataIntegrity(y, length) {
+ return nil, errors.New("math/rand failed to generate secure random data")
+ }
+ for i := 0; i < length; i++ {
+ res[i] = x[i] ^ y[i]
+ }
+ if !validateDataIntegrity(res, length) {
+ return nil, errors.New("failed to generate secure random data")
+ }
+ return res, nil
+}
+
// Wrap bundles the message into an Envelope to transmit over the network.
func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err error) {
if options.TTL == 0 {
@@ -258,12 +264,11 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
// decryptSymmetric decrypts a message with a topic key, using AES-GCM-256.
// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize).
func (msg *ReceivedMessage) decryptSymmetric(key []byte) error {
- // In v6, symmetric messages are expected to contain the 12-byte
- // "salt" at the end of the payload.
- if len(msg.Raw) < AESNonceLength {
+ // symmetric messages are expected to contain the 12-byte nonce at the end of the payload
+ if len(msg.Raw) < aesNonceLength {
return errors.New("missing salt or invalid payload in symmetric message")
}
- salt := msg.Raw[len(msg.Raw)-AESNonceLength:]
+ salt := msg.Raw[len(msg.Raw)-aesNonceLength:]
block, err := aes.NewCipher(key)
if err != nil {
@@ -273,11 +278,7 @@ func (msg *ReceivedMessage) decryptSymmetric(key []byte) error {
if err != nil {
return err
}
- if len(salt) != aesgcm.NonceSize() {
- log.Error("decrypting the message", "AES salt size", len(salt))
- return errors.New("wrong AES salt size")
- }
- decrypted, err := aesgcm.Open(nil, salt, msg.Raw[:len(msg.Raw)-AESNonceLength], nil)
+ decrypted, err := aesgcm.Open(nil, salt, msg.Raw[:len(msg.Raw)-aesNonceLength], nil)
if err != nil {
return err
}
@@ -295,8 +296,8 @@ func (msg *ReceivedMessage) decryptAsymmetric(key *ecdsa.PrivateKey) error {
return err
}
-// Validate checks the validity and extracts the fields in case of success
-func (msg *ReceivedMessage) Validate() bool {
+// ValidateAndParse checks the message validity and extracts the fields in case of success.
+func (msg *ReceivedMessage) ValidateAndParse() bool {
end := len(msg.Raw)
if end < 1 {
return false
@@ -307,41 +308,32 @@ func (msg *ReceivedMessage) Validate() bool {
if end <= 1 {
return false
}
- msg.Signature = msg.Raw[end:]
+ msg.Signature = msg.Raw[end : end+signatureLength]
msg.Src = msg.SigToPubKey()
if msg.Src == nil {
return false
}
}
- padSize, ok := msg.extractPadding(end)
- if !ok {
- return false
+ beg := 1
+ payloadSize := 0
+ sizeOfPayloadSizeField := int(msg.Raw[0] & SizeMask) // number of bytes indicating the size of payload
+ if sizeOfPayloadSizeField != 0 {
+ payloadSize = int(bytesToUintLittleEndian(msg.Raw[beg : beg+sizeOfPayloadSizeField]))
+ if payloadSize+1 > end {
+ return false
+ }
+ beg += sizeOfPayloadSizeField
+ msg.Payload = msg.Raw[beg : beg+payloadSize]
}
- msg.Payload = msg.Raw[1+padSize : end]
+ beg += payloadSize
+ msg.Padding = msg.Raw[beg:end]
return true
}
-// extractPadding extracts the padding from raw message.
-// although we don't support sending messages with padding size
-// exceeding 255 bytes, such messages are perfectly valid, and
-// can be successfully decrypted.
-func (msg *ReceivedMessage) extractPadding(end int) (int, bool) {
- paddingSize := 0
- sz := int(msg.Raw[0] & paddingMask) // number of bytes indicating the entire size of padding (including these bytes)
- // could be zero -- it means no padding
- if sz != 0 {
- paddingSize = int(bytesToUintLittleEndian(msg.Raw[1 : 1+sz]))
- if paddingSize < sz || paddingSize+1 > end {
- return 0, false
- }
- msg.Padding = msg.Raw[1+sz : 1+paddingSize]
- }
- return paddingSize, true
-}
-
-// Recover retrieves the public key of the message signer.
+// SigToPubKey returns the public key associated to the message's
+// signature.
func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey {
defer func() { recover() }() // in case of invalid signature
@@ -353,7 +345,7 @@ func (msg *ReceivedMessage) SigToPubKey() *ecdsa.PublicKey {
return pub
}
-// hash calculates the SHA3 checksum of the message flags, payload and padding.
+// hash calculates the SHA3 checksum of the message flags, payload size field, payload and padding.
func (msg *ReceivedMessage) hash() []byte {
if isMessageSigned(msg.Raw[0]) {
sz := len(msg.Raw) - signatureLength
diff --git a/whisper/whisperv6/message_test.go b/whisper/whisperv6/message_test.go
index c90bcc01e..0a5c1c853 100644
--- a/whisper/whisperv6/message_test.go
+++ b/whisper/whisperv6/message_test.go
@@ -18,9 +18,12 @@ package whisperv6
import (
"bytes"
+ "crypto/aes"
+ "crypto/cipher"
mrand "math/rand"
"testing"
+ "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -90,8 +93,8 @@ func singleMessageTest(t *testing.T, symmetric bool) {
t.Fatalf("failed to encrypt with seed %d: %s.", seed, err)
}
- if !decrypted.Validate() {
- t.Fatalf("failed to validate with seed %d.", seed)
+ if !decrypted.ValidateAndParse() {
+ t.Fatalf("failed to validate with seed %d, symmetric = %v.", seed, symmetric)
}
if !bytes.Equal(text, decrypted.Payload) {
@@ -206,7 +209,7 @@ func TestEnvelopeOpen(t *testing.T) {
InitSingleTest()
var symmetric bool
- for i := 0; i < 256; i++ {
+ for i := 0; i < 32; i++ {
singleEnvelopeOpenTest(t, symmetric)
symmetric = !symmetric
}
@@ -417,30 +420,6 @@ func TestPadding(t *testing.T) {
}
}
-func TestPaddingAppendedToSymMessages(t *testing.T) {
- params := &MessageParams{
- Payload: make([]byte, 246),
- KeySym: make([]byte, aesKeyLength),
- }
-
- // Simulate a message with a payload just under 256 so that
- // payload + flag + aesnonce > 256. Check that the result
- // is padded on the next 256 boundary.
- msg := sentMessage{}
- msg.Raw = make([]byte, len(params.Payload)+1+AESNonceLength)
-
- err := msg.appendPadding(params)
-
- if err != nil {
- t.Fatalf("Error appending padding to message %v", err)
- return
- }
-
- if len(msg.Raw) != 512 {
- t.Errorf("Invalid size %d != 512", len(msg.Raw))
- }
-}
-
func TestPaddingAppendedToSymMessagesWithSignature(t *testing.T) {
params := &MessageParams{
Payload: make([]byte, 246),
@@ -456,10 +435,11 @@ func TestPaddingAppendedToSymMessagesWithSignature(t *testing.T) {
params.Src = pSrc
// Simulate a message with a payload just under 256 so that
- // payload + flag + aesnonce > 256. Check that the result
+ // payload + flag + signature > 256. Check that the result
// is padded on the next 256 boundary.
msg := sentMessage{}
- msg.Raw = make([]byte, len(params.Payload)+1+AESNonceLength+signatureLength)
+ const payloadSizeFieldMinSize = 1
+ msg.Raw = make([]byte, flagsLength+payloadSizeFieldMinSize+len(params.Payload))
err = msg.appendPadding(params)
@@ -468,7 +448,24 @@ func TestPaddingAppendedToSymMessagesWithSignature(t *testing.T) {
return
}
- if len(msg.Raw) != 512 {
+ if len(msg.Raw) != 512-signatureLength {
t.Errorf("Invalid size %d != 512", len(msg.Raw))
}
}
+
+func TestAesNonce(t *testing.T) {
+ key := hexutil.MustDecode("0x03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31")
+ block, err := aes.NewCipher(key)
+ if err != nil {
+ t.Fatalf("NewCipher failed: %s", err)
+ }
+ aesgcm, err := cipher.NewGCM(block)
+ if err != nil {
+ t.Fatalf("NewGCM failed: %s", err)
+ }
+ // This is the most important single test in this package.
+ // If it fails, whisper will not be working.
+ if aesgcm.NonceSize() != aesNonceLength {
+ t.Fatalf("Nonce size is wrong. This is a critical error. Apparently AES nonce size have changed in the new version of AES GCM package. Whisper will not be working until this problem is resolved.")
+ }
+}
diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go
index 65e0c77b0..4ef0f3c43 100644
--- a/whisper/whisperv6/peer.go
+++ b/whisper/whisperv6/peer.go
@@ -28,7 +28,7 @@ import (
set "gopkg.in/fatih/set.v0"
)
-// peer represents a whisper protocol peer connection.
+// Peer represents a whisper protocol peer connection.
type Peer struct {
host *Whisper
peer *p2p.Peer
@@ -36,6 +36,8 @@ type Peer struct {
trusted bool
powRequirement float64
+ bloomFilter []byte
+ fullNode bool
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
@@ -52,56 +54,86 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
powRequirement: 0.0,
known: set.New(),
quit: make(chan struct{}),
+ bloomFilter: makeFullNodeBloom(),
+ fullNode: true,
}
}
// start initiates the peer updater, periodically broadcasting the whisper packets
// into the network.
-func (p *Peer) start() {
- go p.update()
- log.Trace("start", "peer", p.ID())
+func (peer *Peer) start() {
+ go peer.update()
+ log.Trace("start", "peer", peer.ID())
}
// stop terminates the peer updater, stopping message forwarding to it.
-func (p *Peer) stop() {
- close(p.quit)
- log.Trace("stop", "peer", p.ID())
+func (peer *Peer) stop() {
+ close(peer.quit)
+ log.Trace("stop", "peer", peer.ID())
}
// handshake sends the protocol initiation status message to the remote peer and
// verifies the remote status too.
-func (p *Peer) handshake() error {
+func (peer *Peer) handshake() error {
// Send the handshake status message asynchronously
errc := make(chan error, 1)
go func() {
- errc <- p2p.Send(p.ws, statusCode, ProtocolVersion)
+ pow := peer.host.MinPow()
+ powConverted := math.Float64bits(pow)
+ bloom := peer.host.BloomFilter()
+ errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
}()
+
// Fetch the remote status packet and verify protocol match
- packet, err := p.ws.ReadMsg()
+ packet, err := peer.ws.ReadMsg()
if err != nil {
return err
}
if packet.Code != statusCode {
- return fmt.Errorf("peer [%x] sent packet %x before status packet", p.ID(), packet.Code)
+ return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
}
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
+ _, err = s.List()
+ if err != nil {
+ return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
+ }
peerVersion, err := s.Uint()
if err != nil {
- return fmt.Errorf("peer [%x] sent bad status message: %v", p.ID(), err)
+ return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
}
if peerVersion != ProtocolVersion {
- return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", p.ID(), peerVersion, ProtocolVersion)
+ return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
+ }
+
+ // only version is mandatory, subsequent parameters are optional
+ powRaw, err := s.Uint()
+ if err == nil {
+ pow := math.Float64frombits(powRaw)
+ if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
+ return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
+ }
+ peer.powRequirement = pow
+
+ var bloom []byte
+ err = s.Decode(&bloom)
+ if err == nil {
+ sz := len(bloom)
+ if sz != bloomFilterSize && sz != 0 {
+ return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
+ }
+ peer.setBloomFilter(bloom)
+ }
}
- // Wait until out own status is consumed too
+
if err := <-errc; err != nil {
- return fmt.Errorf("peer [%x] failed to send status packet: %v", p.ID(), err)
+ return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}
return nil
}
// update executes periodic operations on the peer, including message transmission
// and expiration.
-func (p *Peer) update() {
+func (peer *Peer) update() {
// Start the tickers for the updates
expire := time.NewTicker(expirationCycle)
transmit := time.NewTicker(transmissionCycle)
@@ -110,15 +142,15 @@ func (p *Peer) update() {
for {
select {
case <-expire.C:
- p.expire()
+ peer.expire()
case <-transmit.C:
- if err := p.broadcast(); err != nil {
- log.Trace("broadcast failed", "reason", err, "peer", p.ID())
+ if err := peer.broadcast(); err != nil {
+ log.Trace("broadcast failed", "reason", err, "peer", peer.ID())
return
}
- case <-p.quit:
+ case <-peer.quit:
return
}
}
@@ -152,24 +184,24 @@ func (peer *Peer) expire() {
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
-func (p *Peer) broadcast() error {
- envelopes := p.host.Envelopes()
+func (peer *Peer) broadcast() error {
+ envelopes := peer.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
- if !p.marked(envelope) && envelope.PoW() >= p.powRequirement {
+ if !peer.marked(envelope) && envelope.PoW() >= peer.powRequirement && peer.bloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
if len(bundle) > 0 {
// transmit the batch of envelopes
- if err := p2p.Send(p.ws, messagesCode, bundle); err != nil {
+ if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {
return err
}
// mark envelopes only if they were successfully sent
for _, e := range bundle {
- p.mark(e)
+ peer.mark(e)
}
log.Trace("broadcast", "num. messages", len(bundle))
@@ -177,12 +209,37 @@ func (p *Peer) broadcast() error {
return nil
}
-func (p *Peer) ID() []byte {
- id := p.peer.ID()
+// ID returns a peer's id
+func (peer *Peer) ID() []byte {
+ id := peer.peer.ID()
return id[:]
}
-func (p *Peer) notifyAboutPowRequirementChange(pow float64) error {
+func (peer *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
- return p2p.Send(p.ws, powRequirementCode, i)
+ return p2p.Send(peer.ws, powRequirementCode, i)
+}
+
+func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
+ return p2p.Send(peer.ws, bloomFilterExCode, bloom)
+}
+
+func (peer *Peer) bloomMatch(env *Envelope) bool {
+ return peer.fullNode || bloomFilterMatch(peer.bloomFilter, env.Bloom())
+}
+
+func (peer *Peer) setBloomFilter(bloom []byte) {
+ peer.bloomFilter = bloom
+ peer.fullNode = isFullNode(bloom)
+ if peer.fullNode && peer.bloomFilter == nil {
+ peer.bloomFilter = makeFullNodeBloom()
+ }
+}
+
+func makeFullNodeBloom() []byte {
+ bloom := make([]byte, bloomFilterSize)
+ for i := 0; i < bloomFilterSize; i++ {
+ bloom[i] = 0xFF
+ }
+ return bloom
}
diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go
index 599a479be..9ce5eed8b 100644
--- a/whisper/whisperv6/peer_test.go
+++ b/whisper/whisperv6/peer_test.go
@@ -20,19 +20,21 @@ import (
"bytes"
"crypto/ecdsa"
"fmt"
+ mrand "math/rand"
"net"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/nat"
)
-var keys []string = []string{
+var keys = []string{
"d49dcf37238dc8a7aac57dc61b9fee68f0a97f062968978b9fafa7d1033d03a9",
"73fd6143c48e80ed3c56ea159fe7494a0b6b393a392227b422f4c3e8f1b54f98",
"119dd32adb1daa7a4c7bf77f847fb28730785aa92947edf42fdd997b54de40dc",
@@ -68,9 +70,8 @@ var keys []string = []string{
"7184c1701569e3a4c4d2ddce691edd983b81e42e09196d332e1ae2f1e062cff4",
}
-const NumNodes = 16 // must not exceed the number of keys (32)
-
type TestData struct {
+ started int
counter [NumNodes]int
mutex sync.RWMutex
}
@@ -79,21 +80,32 @@ type TestNode struct {
shh *Whisper
id *ecdsa.PrivateKey
server *p2p.Server
- filerId string
+ filerID string
}
+const NumNodes = 8 // must not exceed the number of keys (32)
+
var result TestData
var nodes [NumNodes]*TestNode
-var sharedKey []byte = []byte("some arbitrary data here")
-var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
-var expectedMessage []byte = []byte("per rectum ad astra")
+var sharedKey = hexutil.MustDecode("0x03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31")
+var wrongKey = hexutil.MustDecode("0xf91156714d7ec88d3edc1c652c2181dbb3044e8771c683f3b30d33c12b986b11")
+var sharedTopic = TopicType{0xF, 0x1, 0x2, 0}
+var wrongTopic = TopicType{0, 0, 0, 0}
+var expectedMessage = []byte("per aspera ad astra")
+var unexpectedMessage = []byte("per rectum ad astra")
+var masterBloomFilter []byte
+var masterPow = 0.00000001
+var round = 1
+var debugMode = false
+var prevTime time.Time
+var cntPrev int
func TestSimulation(t *testing.T) {
// create a chain of whisper nodes,
// installs the filters with shared (predefined) parameters
initialize(t)
- // each node sends a number of random (undecryptable) messages
+ // each node sends one random (not decryptable) message
for i := 0; i < NumNodes; i++ {
sendMsg(t, false, i)
}
@@ -104,8 +116,12 @@ func TestSimulation(t *testing.T) {
// check if each node have received and decrypted exactly one message
checkPropagation(t, true)
- // send protocol-level messages (powRequirementCode) and check the new PoW requirement values
- powReqExchange(t)
+ // check if Status message was correctly decoded
+ checkBloomFilterExchange(t)
+ checkPowExchange(t)
+
+ // send new pow and bloom exchange messages
+ resetParams(t)
// node #1 sends one expected (decryptable) message
sendMsg(t, true, 1)
@@ -113,24 +129,68 @@ func TestSimulation(t *testing.T) {
// check if each node (except node #0) have received and decrypted exactly one message
checkPropagation(t, false)
+ // check if corresponding protocol-level messages were correctly decoded
+ checkPowExchangeForNodeZero(t)
+ checkBloomFilterExchange(t)
+
stopServers()
}
+func resetParams(t *testing.T) {
+ // change pow only for node zero
+ masterPow = 7777777.0
+ nodes[0].shh.SetMinimumPoW(masterPow)
+
+ // change bloom for all nodes
+ masterBloomFilter = TopicToBloom(sharedTopic)
+ for i := 0; i < NumNodes; i++ {
+ nodes[i].shh.SetBloomFilter(masterBloomFilter)
+ }
+
+ round++
+}
+
+func initBloom(t *testing.T) {
+ masterBloomFilter = make([]byte, bloomFilterSize)
+ _, err := mrand.Read(masterBloomFilter)
+ if err != nil {
+ t.Fatalf("rand failed: %s.", err)
+ }
+
+ msgBloom := TopicToBloom(sharedTopic)
+ masterBloomFilter = addBloom(masterBloomFilter, msgBloom)
+ for i := 0; i < 32; i++ {
+ masterBloomFilter[i] = 0xFF
+ }
+
+ if !bloomFilterMatch(masterBloomFilter, msgBloom) {
+ t.Fatalf("bloom mismatch on initBloom.")
+ }
+}
+
func initialize(t *testing.T) {
+ initBloom(t)
+
var err error
ip := net.IPv4(127, 0, 0, 1)
port0 := 30303
for i := 0; i < NumNodes; i++ {
var node TestNode
+ b := make([]byte, bloomFilterSize)
+ copy(b, masterBloomFilter)
node.shh = New(&DefaultConfig)
- node.shh.SetMinimumPowTest(0.00000001)
+ node.shh.SetMinimumPoW(masterPow)
+ node.shh.SetBloomFilter(b)
+ if !bytes.Equal(node.shh.BloomFilter(), masterBloomFilter) {
+ t.Fatalf("bloom mismatch on init.")
+ }
node.shh.Start(nil)
topics := make([]TopicType, 0)
topics = append(topics, sharedTopic)
f := Filter{KeySym: sharedKey}
f.Topics = [][]byte{topics[0][:]}
- node.filerId, err = node.shh.Subscribe(&f)
+ node.filerID, err = node.shh.Subscribe(&f)
if err != nil {
t.Fatalf("failed to install the filter: %s.", err)
}
@@ -143,9 +203,9 @@ func initialize(t *testing.T) {
name := common.MakeName("whisper-go", "2.0")
var peers []*discover.Node
if i > 0 {
- peerNodeId := nodes[i-1].id
+ peerNodeID := nodes[i-1].id
peerPort := uint16(port - 1)
- peerNode := discover.PubkeyID(&peerNodeId.PublicKey)
+ peerNode := discover.PubkeyID(&peerNodeID.PublicKey)
peer := discover.NewNode(peerNode, ip, peerPort, peerPort)
peers = append(peers, peer)
}
@@ -167,22 +227,29 @@ func initialize(t *testing.T) {
nodes[i] = &node
}
- for i := 1; i < NumNodes; i++ {
- go nodes[i].server.Start()
+ for i := 0; i < NumNodes; i++ {
+ go startServer(t, nodes[i].server)
}
- // we need to wait until the first node actually starts
- err = nodes[0].server.Start()
+ waitForServersToStart(t)
+}
+
+func startServer(t *testing.T, s *p2p.Server) {
+ err := s.Start()
if err != nil {
t.Fatalf("failed to start the fisrt server.")
}
+
+ result.mutex.Lock()
+ defer result.mutex.Unlock()
+ result.started++
}
func stopServers() {
for i := 0; i < NumNodes; i++ {
n := nodes[i]
if n != nil {
- n.shh.Unsubscribe(n.filerId)
+ n.shh.Unsubscribe(n.filerID)
n.shh.Stop()
n.server.Stop()
}
@@ -194,8 +261,10 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
return
}
- const cycle = 50
- const iterations = 200
+ prevTime = time.Now()
+ // (cycle * iterations) should not exceed 50 seconds, since TTL=50
+ const cycle = 200 // time in milliseconds
+ const iterations = 250
first := 0
if !includingNodeZero {
@@ -204,35 +273,35 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
for j := 0; j < iterations; j++ {
for i := first; i < NumNodes; i++ {
- f := nodes[i].shh.GetFilter(nodes[i].filerId)
+ f := nodes[i].shh.GetFilter(nodes[i].filerID)
if f == nil {
- t.Fatalf("failed to get filterId %s from node %d.", nodes[i].filerId, i)
+ t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerID, i, round)
}
mail := f.Retrieve()
- if !validateMail(t, i, mail) {
- return
- }
+ validateMail(t, i, mail)
if isTestComplete() {
+ checkTestStatus()
return
}
}
+ checkTestStatus()
time.Sleep(cycle * time.Millisecond)
}
- t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000)
-
if !includingNodeZero {
- f := nodes[0].shh.GetFilter(nodes[0].filerId)
+ f := nodes[0].shh.GetFilter(nodes[0].filerID)
if f != nil {
t.Fatalf("node zero received a message with low PoW.")
}
}
+
+ t.Fatalf("Test was not complete (%d round): timeout %d seconds. nodes=%v", round, iterations*cycle/1000, nodes)
}
-func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
+func validateMail(t *testing.T, index int, mail []*ReceivedMessage) {
var cnt int
for _, m := range mail {
if bytes.Equal(m.Payload, expectedMessage) {
@@ -242,14 +311,13 @@ func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
if cnt == 0 {
// no messages received yet: nothing is wrong
- return true
+ return
}
if cnt > 1 {
t.Fatalf("node %d received %d.", index, cnt)
- return false
}
- if cnt > 0 {
+ if cnt == 1 {
result.mutex.Lock()
defer result.mutex.Unlock()
result.counter[index] += cnt
@@ -257,7 +325,28 @@ func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
t.Fatalf("node %d accumulated %d.", index, result.counter[index])
}
}
- return true
+}
+
+func checkTestStatus() {
+ var cnt int
+ var arr [NumNodes]int
+
+ for i := 0; i < NumNodes; i++ {
+ arr[i] = nodes[i].server.PeerCount()
+ envelopes := nodes[i].shh.Envelopes()
+ if len(envelopes) >= NumNodes {
+ cnt++
+ }
+ }
+
+ if debugMode {
+ if cntPrev != cnt {
+ fmt.Printf(" %v \t number of nodes that have received all msgs: %d, number of peers per node: %v \n",
+ time.Since(prevTime), cnt, arr)
+ prevTime = time.Now()
+ cntPrev = cnt
+ }
+ }
}
func isTestComplete() bool {
@@ -272,7 +361,7 @@ func isTestComplete() bool {
for i := 0; i < NumNodes; i++ {
envelopes := nodes[i].shh.Envelopes()
- if len(envelopes) < 2 {
+ if len(envelopes) < NumNodes+1 {
return false
}
}
@@ -287,9 +376,10 @@ func sendMsg(t *testing.T, expected bool, id int) {
opt := MessageParams{KeySym: sharedKey, Topic: sharedTopic, Payload: expectedMessage, PoW: 0.00000001, WorkTime: 1}
if !expected {
- opt.KeySym[0]++
- opt.Topic[0]++
- opt.Payload = opt.Payload[1:]
+ opt.KeySym = wrongKey
+ opt.Topic = wrongTopic
+ opt.Payload = unexpectedMessage
+ opt.Payload[0] = byte(id)
}
msg, err := NewSentMessage(&opt)
@@ -332,34 +422,89 @@ func TestPeerBasic(t *testing.T) {
}
}
-func powReqExchange(t *testing.T) {
+func checkPowExchangeForNodeZero(t *testing.T) {
+ const iterations = 200
+ for j := 0; j < iterations; j++ {
+ lastCycle := (j == iterations-1)
+ ok := checkPowExchangeForNodeZeroOnce(t, lastCycle)
+ if ok {
+ break
+ }
+ time.Sleep(50 * time.Millisecond)
+ }
+}
+
+func checkPowExchangeForNodeZeroOnce(t *testing.T, mustPass bool) bool {
+ cnt := 0
for i, node := range nodes {
for peer := range node.shh.peers {
- if peer.powRequirement > 1000.0 {
- t.Fatalf("node %d: one of the peers' pow requirement is too big (%f).", i, peer.powRequirement)
+ if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
+ cnt++
+ if peer.powRequirement != masterPow {
+ if mustPass {
+ t.Fatalf("node %d: failed to set the new pow requirement for node zero.", i)
+ } else {
+ return false
+ }
+ }
}
}
}
+ if cnt == 0 {
+ t.Fatalf("looking for node zero: no matching peers found.")
+ }
+ return true
+}
- const pow float64 = 7777777.0
- nodes[0].shh.SetMinimumPoW(pow)
-
- // wait until all the messages are delivered
- time.Sleep(64 * time.Millisecond)
+func checkPowExchange(t *testing.T) {
+ for i, node := range nodes {
+ for peer := range node.shh.peers {
+ if peer.peer.ID() != discover.PubkeyID(&nodes[0].id.PublicKey) {
+ if peer.powRequirement != masterPow {
+ t.Fatalf("node %d: failed to exchange pow requirement in round %d; expected %f, got %f",
+ i, round, masterPow, peer.powRequirement)
+ }
+ }
+ }
+ }
+}
- cnt := 0
+func checkBloomFilterExchangeOnce(t *testing.T, mustPass bool) bool {
for i, node := range nodes {
for peer := range node.shh.peers {
- if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
- cnt++
- if peer.powRequirement != pow {
- t.Fatalf("node %d: failed to set the new pow requirement.", i)
+ if !bytes.Equal(peer.bloomFilter, masterBloomFilter) {
+ if mustPass {
+ t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got",
+ i, round, masterBloomFilter, peer.bloomFilter)
+ } else {
+ return false
}
}
}
}
- if cnt == 0 {
- t.Fatalf("no matching peers found.")
+ return true
+}
+
+func checkBloomFilterExchange(t *testing.T) {
+ const iterations = 200
+ for j := 0; j < iterations; j++ {
+ lastCycle := (j == iterations-1)
+ ok := checkBloomFilterExchangeOnce(t, lastCycle)
+ if ok {
+ break
+ }
+ time.Sleep(50 * time.Millisecond)
+ }
+}
+
+func waitForServersToStart(t *testing.T) {
+ const iterations = 200
+ for j := 0; j < iterations; j++ {
+ time.Sleep(50 * time.Millisecond)
+ if result.started == NumNodes {
+ return
+ }
}
+ t.Fatalf("Failed to start all the servers, running: %d", result.started)
}
diff --git a/whisper/whisperv6/topic.go b/whisper/whisperv6/topic.go
index bf5da01e3..4dd8f283c 100644
--- a/whisper/whisperv6/topic.go
+++ b/whisper/whisperv6/topic.go
@@ -23,11 +23,13 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
)
-// Topic represents a cryptographically secure, probabilistic partial
+// TopicType represents a cryptographically secure, probabilistic partial
// classifications of a message, determined as the first (left) 4 bytes of the
// SHA3 hash of some arbitrary data given by the original author of the message.
type TopicType [TopicLength]byte
+// BytesToTopic converts from the byte array representation of a topic
+// into the TopicType type.
func BytesToTopic(b []byte) (t TopicType) {
sz := TopicLength
if x := len(b); x < TopicLength {
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 492591486..600f9cb28 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -19,7 +19,6 @@ package whisperv6
import (
"bytes"
"crypto/ecdsa"
- crand "crypto/rand"
"crypto/sha256"
"fmt"
"math"
@@ -39,6 +38,8 @@ import (
set "gopkg.in/fatih/set.v0"
)
+// Statistics holds several message-related counter for analytics
+// purposes.
type Statistics struct {
messagesCleared int
memoryCleared int
@@ -48,9 +49,12 @@ type Statistics struct {
}
const (
- minPowIdx = iota // Minimal PoW required by the whisper node
- maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
- overflowIdx = iota // Indicator of message queue overflow
+ maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
+ overflowIdx // Indicator of message queue overflow
+ minPowIdx // Minimal PoW required by the whisper node
+ minPowToleranceIdx // Minimal PoW tolerated by the whisper node for a limited time
+ bloomFilterIdx // Bloom filter for topics of interest for this node
+ bloomFilterToleranceIdx // Bloom filter tolerated by the whisper node for a limited time
)
// Whisper represents a dark communication interface through the Ethereum
@@ -76,7 +80,7 @@ type Whisper struct {
settings syncmap.Map // holds configuration settings that can be dynamically changed
- reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
+ syncAllowance int // maximum time in seconds allowed to process the whisper-related messages
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
@@ -91,15 +95,15 @@ func New(cfg *Config) *Whisper {
}
whisper := &Whisper{
- privateKeys: make(map[string]*ecdsa.PrivateKey),
- symKeys: make(map[string][]byte),
- envelopes: make(map[common.Hash]*Envelope),
- expirations: make(map[uint32]*set.SetNonTS),
- peers: make(map[*Peer]struct{}),
- messageQueue: make(chan *Envelope, messageQueueLimit),
- p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
- quit: make(chan struct{}),
- reactionAllowance: SynchAllowance,
+ privateKeys: make(map[string]*ecdsa.PrivateKey),
+ symKeys: make(map[string][]byte),
+ envelopes: make(map[common.Hash]*Envelope),
+ expirations: make(map[uint32]*set.SetNonTS),
+ peers: make(map[*Peer]struct{}),
+ messageQueue: make(chan *Envelope, messageQueueLimit),
+ p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
+ quit: make(chan struct{}),
+ syncAllowance: DefaultSyncAllowance,
}
whisper.filters = NewFilters(whisper)
@@ -126,30 +130,74 @@ func New(cfg *Config) *Whisper {
return whisper
}
-func (w *Whisper) MinPow() float64 {
- val, _ := w.settings.Load(minPowIdx)
+// MinPow returns the PoW value required by this node.
+func (whisper *Whisper) MinPow() float64 {
+ val, exist := whisper.settings.Load(minPowIdx)
+ if !exist || val == nil {
+ return DefaultMinimumPoW
+ }
+ v, ok := val.(float64)
+ if !ok {
+ log.Error("Error loading minPowIdx, using default")
+ return DefaultMinimumPoW
+ }
+ return v
+}
+
+// MinPowTolerance returns the value of minimum PoW which is tolerated for a limited
+// time after PoW was changed. If sufficient time have elapsed or no change of PoW
+// have ever occurred, the return value will be the same as return value of MinPow().
+func (whisper *Whisper) MinPowTolerance() float64 {
+ val, exist := whisper.settings.Load(minPowToleranceIdx)
+ if !exist || val == nil {
+ return DefaultMinimumPoW
+ }
return val.(float64)
}
+// BloomFilter returns the aggregated bloom filter for all the topics of interest.
+// The nodes are required to send only messages that match the advertised bloom filter.
+// If a message does not match the bloom, it will tantamount to spam, and the peer will
+// be disconnected.
+func (whisper *Whisper) BloomFilter() []byte {
+ val, exist := whisper.settings.Load(bloomFilterIdx)
+ if !exist || val == nil {
+ return nil
+ }
+ return val.([]byte)
+}
+
+// BloomFilterTolerance returns the bloom filter which is tolerated for a limited
+// time after new bloom was advertised to the peers. If sufficient time have elapsed
+// or no change of bloom filter have ever occurred, the return value will be the same
+// as return value of BloomFilter().
+func (whisper *Whisper) BloomFilterTolerance() []byte {
+ val, exist := whisper.settings.Load(bloomFilterToleranceIdx)
+ if !exist || val == nil {
+ return nil
+ }
+ return val.([]byte)
+}
+
// MaxMessageSize returns the maximum accepted message size.
-func (w *Whisper) MaxMessageSize() uint32 {
- val, _ := w.settings.Load(maxMsgSizeIdx)
+func (whisper *Whisper) MaxMessageSize() uint32 {
+ val, _ := whisper.settings.Load(maxMsgSizeIdx)
return val.(uint32)
}
// Overflow returns an indication if the message queue is full.
-func (w *Whisper) Overflow() bool {
- val, _ := w.settings.Load(overflowIdx)
+func (whisper *Whisper) Overflow() bool {
+ val, _ := whisper.settings.Load(overflowIdx)
return val.(bool)
}
// APIs returns the RPC descriptors the Whisper implementation offers
-func (w *Whisper) APIs() []rpc.API {
+func (whisper *Whisper) APIs() []rpc.API {
return []rpc.API{
{
Namespace: ProtocolName,
Version: ProtocolVersionStr,
- Service: NewPublicWhisperAPI(w),
+ Service: NewPublicWhisperAPI(whisper),
Public: true,
},
}
@@ -157,63 +205,77 @@ func (w *Whisper) APIs() []rpc.API {
// RegisterServer registers MailServer interface.
// MailServer will process all the incoming messages with p2pRequestCode.
-func (w *Whisper) RegisterServer(server MailServer) {
- w.mailServer = server
+func (whisper *Whisper) RegisterServer(server MailServer) {
+ whisper.mailServer = server
}
// Protocols returns the whisper sub-protocols ran by this particular client.
-func (w *Whisper) Protocols() []p2p.Protocol {
- return []p2p.Protocol{w.protocol}
+func (whisper *Whisper) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{whisper.protocol}
}
// Version returns the whisper sub-protocols version number.
-func (w *Whisper) Version() uint {
- return w.protocol.Version
+func (whisper *Whisper) Version() uint {
+ return whisper.protocol.Version
}
// SetMaxMessageSize sets the maximal message size allowed by this node
-func (w *Whisper) SetMaxMessageSize(size uint32) error {
+func (whisper *Whisper) SetMaxMessageSize(size uint32) error {
if size > MaxMessageSize {
return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
}
- w.settings.Store(maxMsgSizeIdx, size)
+ whisper.settings.Store(maxMsgSizeIdx, size)
+ return nil
+}
+
+// SetBloomFilter sets the new bloom filter
+func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
+ if len(bloom) != bloomFilterSize {
+ return fmt.Errorf("invalid bloom filter size: %d", len(bloom))
+ }
+
+ b := make([]byte, bloomFilterSize)
+ copy(b, bloom)
+
+ whisper.settings.Store(bloomFilterIdx, b)
+ whisper.notifyPeersAboutBloomFilterChange(b)
+
+ go func() {
+ // allow some time before all the peers have processed the notification
+ time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
+ whisper.settings.Store(bloomFilterToleranceIdx, b)
+ }()
+
return nil
}
// SetMinimumPoW sets the minimal PoW required by this node
-func (w *Whisper) SetMinimumPoW(val float64) error {
+func (whisper *Whisper) SetMinimumPoW(val float64) error {
if val < 0.0 {
return fmt.Errorf("invalid PoW: %f", val)
}
- w.notifyPeersAboutPowRequirementChange(val)
+ whisper.settings.Store(minPowIdx, val)
+ whisper.notifyPeersAboutPowRequirementChange(val)
go func() {
// allow some time before all the peers have processed the notification
- time.Sleep(time.Duration(w.reactionAllowance) * time.Second)
- w.settings.Store(minPowIdx, val)
+ time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
+ whisper.settings.Store(minPowToleranceIdx, val)
}()
return nil
}
-// SetMinimumPoW sets the minimal PoW in test environment
-func (w *Whisper) SetMinimumPowTest(val float64) {
- w.notifyPeersAboutPowRequirementChange(val)
- w.settings.Store(minPowIdx, val)
+// SetMinimumPowTest sets the minimal PoW in test environment
+func (whisper *Whisper) SetMinimumPowTest(val float64) {
+ whisper.settings.Store(minPowIdx, val)
+ whisper.notifyPeersAboutPowRequirementChange(val)
+ whisper.settings.Store(minPowToleranceIdx, val)
}
-func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
- arr := make([]*Peer, len(w.peers))
- i := 0
-
- w.peerMu.Lock()
- for p := range w.peers {
- arr[i] = p
- i++
- }
- w.peerMu.Unlock()
-
+func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
+ arr := whisper.getPeers()
for _, p := range arr {
err := p.notifyAboutPowRequirementChange(pow)
if err != nil {
@@ -221,16 +283,42 @@ func (w *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
err = p.notifyAboutPowRequirementChange(pow)
}
if err != nil {
- log.Warn("oversized message received", "peer", p.ID(), "error", err)
+ log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err)
}
}
}
+func (whisper *Whisper) notifyPeersAboutBloomFilterChange(bloom []byte) {
+ arr := whisper.getPeers()
+ for _, p := range arr {
+ err := p.notifyAboutBloomFilterChange(bloom)
+ if err != nil {
+ // allow one retry
+ err = p.notifyAboutBloomFilterChange(bloom)
+ }
+ if err != nil {
+ log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err)
+ }
+ }
+}
+
+func (whisper *Whisper) getPeers() []*Peer {
+ arr := make([]*Peer, len(whisper.peers))
+ i := 0
+ whisper.peerMu.Lock()
+ for p := range whisper.peers {
+ arr[i] = p
+ i++
+ }
+ whisper.peerMu.Unlock()
+ return arr
+}
+
// getPeer retrieves peer by ID
-func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
- w.peerMu.Lock()
- defer w.peerMu.Unlock()
- for p := range w.peers {
+func (whisper *Whisper) getPeer(peerID []byte) (*Peer, error) {
+ whisper.peerMu.Lock()
+ defer whisper.peerMu.Unlock()
+ for p := range whisper.peers {
id := p.peer.ID()
if bytes.Equal(peerID, id[:]) {
return p, nil
@@ -241,8 +329,8 @@ func (w *Whisper) getPeer(peerID []byte) (*Peer, error) {
// AllowP2PMessagesFromPeer marks specific peer trusted,
// which will allow it to send historic (expired) messages.
-func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
- p, err := w.getPeer(peerID)
+func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
+ p, err := whisper.getPeer(peerID)
if err != nil {
return err
}
@@ -255,8 +343,8 @@ func (w *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
// request and respond with a number of peer-to-peer messages (possibly expired),
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
-func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
- p, err := w.getPeer(peerID)
+func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
+ p, err := whisper.getPeer(peerID)
if err != nil {
return err
}
@@ -265,22 +353,22 @@ func (w *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) err
}
// SendP2PMessage sends a peer-to-peer message to a specific peer.
-func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
- p, err := w.getPeer(peerID)
+func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
+ p, err := whisper.getPeer(peerID)
if err != nil {
return err
}
- return w.SendP2PDirect(p, envelope)
+ return whisper.SendP2PDirect(p, envelope)
}
// SendP2PDirect sends a peer-to-peer message to a specific peer.
-func (w *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
+func (whisper *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error {
return p2p.Send(peer.ws, p2pMessageCode, envelope)
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption. Returns ID of the new key pair.
-func (w *Whisper) NewKeyPair() (string, error) {
+func (whisper *Whisper) NewKeyPair() (string, error) {
key, err := crypto.GenerateKey()
if err != nil || !validatePrivateKey(key) {
key, err = crypto.GenerateKey() // retry once
@@ -297,55 +385,55 @@ func (w *Whisper) NewKeyPair() (string, error) {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
- w.keyMu.Lock()
- defer w.keyMu.Unlock()
+ whisper.keyMu.Lock()
+ defer whisper.keyMu.Unlock()
- if w.privateKeys[id] != nil {
+ if whisper.privateKeys[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
- w.privateKeys[id] = key
+ whisper.privateKeys[id] = key
return id, nil
}
// DeleteKeyPair deletes the specified key if it exists.
-func (w *Whisper) DeleteKeyPair(key string) bool {
- w.keyMu.Lock()
- defer w.keyMu.Unlock()
+func (whisper *Whisper) DeleteKeyPair(key string) bool {
+ whisper.keyMu.Lock()
+ defer whisper.keyMu.Unlock()
- if w.privateKeys[key] != nil {
- delete(w.privateKeys, key)
+ if whisper.privateKeys[key] != nil {
+ delete(whisper.privateKeys, key)
return true
}
return false
}
// AddKeyPair imports a asymmetric private key and returns it identifier.
-func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
+func (whisper *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
id, err := GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
- w.keyMu.Lock()
- w.privateKeys[id] = key
- w.keyMu.Unlock()
+ whisper.keyMu.Lock()
+ whisper.privateKeys[id] = key
+ whisper.keyMu.Unlock()
return id, nil
}
// HasKeyPair checks if the the whisper node is configured with the private key
// of the specified public pair.
-func (w *Whisper) HasKeyPair(id string) bool {
- w.keyMu.RLock()
- defer w.keyMu.RUnlock()
- return w.privateKeys[id] != nil
+func (whisper *Whisper) HasKeyPair(id string) bool {
+ whisper.keyMu.RLock()
+ defer whisper.keyMu.RUnlock()
+ return whisper.privateKeys[id] != nil
}
// GetPrivateKey retrieves the private key of the specified identity.
-func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
- w.keyMu.RLock()
- defer w.keyMu.RUnlock()
- key := w.privateKeys[id]
+func (whisper *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
+ whisper.keyMu.RLock()
+ defer whisper.keyMu.RUnlock()
+ key := whisper.privateKeys[id]
if key == nil {
return nil, fmt.Errorf("invalid id")
}
@@ -354,12 +442,11 @@ func (w *Whisper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
// GenerateSymKey generates a random symmetric key and stores it under id,
// which is then returned. Will be used in the future for session key exchange.
-func (w *Whisper) GenerateSymKey() (string, error) {
- key := make([]byte, aesKeyLength)
- _, err := crand.Read(key)
+func (whisper *Whisper) GenerateSymKey() (string, error) {
+ key, err := generateSecureRandomData(aesKeyLength)
if err != nil {
return "", err
- } else if !validateSymmetricKey(key) {
+ } else if !validateDataIntegrity(key, aesKeyLength) {
return "", fmt.Errorf("error in GenerateSymKey: crypto/rand failed to generate random data")
}
@@ -368,18 +455,18 @@ func (w *Whisper) GenerateSymKey() (string, error) {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
- w.keyMu.Lock()
- defer w.keyMu.Unlock()
+ whisper.keyMu.Lock()
+ defer whisper.keyMu.Unlock()
- if w.symKeys[id] != nil {
+ if whisper.symKeys[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
- w.symKeys[id] = key
+ whisper.symKeys[id] = key
return id, nil
}
// AddSymKeyDirect stores the key, and returns its id.
-func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) {
+func (whisper *Whisper) AddSymKeyDirect(key []byte) (string, error) {
if len(key) != aesKeyLength {
return "", fmt.Errorf("wrong key size: %d", len(key))
}
@@ -389,23 +476,23 @@ func (w *Whisper) AddSymKeyDirect(key []byte) (string, error) {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
- w.keyMu.Lock()
- defer w.keyMu.Unlock()
+ whisper.keyMu.Lock()
+ defer whisper.keyMu.Unlock()
- if w.symKeys[id] != nil {
+ if whisper.symKeys[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
- w.symKeys[id] = key
+ whisper.symKeys[id] = key
return id, nil
}
// AddSymKeyFromPassword generates the key from password, stores it, and returns its id.
-func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) {
+func (whisper *Whisper) AddSymKeyFromPassword(password string) (string, error) {
id, err := GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
- if w.HasSymKey(id) {
+ if whisper.HasSymKey(id) {
return "", fmt.Errorf("failed to generate unique ID")
}
@@ -416,60 +503,81 @@ func (w *Whisper) AddSymKeyFromPassword(password string) (string, error) {
return "", err
}
- w.keyMu.Lock()
- defer w.keyMu.Unlock()
+ whisper.keyMu.Lock()
+ defer whisper.keyMu.Unlock()
// double check is necessary, because deriveKeyMaterial() is very slow
- if w.symKeys[id] != nil {
+ if whisper.symKeys[id] != nil {
return "", fmt.Errorf("critical error: failed to generate unique ID")
}
- w.symKeys[id] = derived
+ whisper.symKeys[id] = derived
return id, nil
}
// HasSymKey returns true if there is a key associated with the given id.
// Otherwise returns false.
-func (w *Whisper) HasSymKey(id string) bool {
- w.keyMu.RLock()
- defer w.keyMu.RUnlock()
- return w.symKeys[id] != nil
+func (whisper *Whisper) HasSymKey(id string) bool {
+ whisper.keyMu.RLock()
+ defer whisper.keyMu.RUnlock()
+ return whisper.symKeys[id] != nil
}
// DeleteSymKey deletes the key associated with the name string if it exists.
-func (w *Whisper) DeleteSymKey(id string) bool {
- w.keyMu.Lock()
- defer w.keyMu.Unlock()
- if w.symKeys[id] != nil {
- delete(w.symKeys, id)
+func (whisper *Whisper) DeleteSymKey(id string) bool {
+ whisper.keyMu.Lock()
+ defer whisper.keyMu.Unlock()
+ if whisper.symKeys[id] != nil {
+ delete(whisper.symKeys, id)
return true
}
return false
}
// GetSymKey returns the symmetric key associated with the given id.
-func (w *Whisper) GetSymKey(id string) ([]byte, error) {
- w.keyMu.RLock()
- defer w.keyMu.RUnlock()
- if w.symKeys[id] != nil {
- return w.symKeys[id], nil
+func (whisper *Whisper) GetSymKey(id string) ([]byte, error) {
+ whisper.keyMu.RLock()
+ defer whisper.keyMu.RUnlock()
+ if whisper.symKeys[id] != nil {
+ return whisper.symKeys[id], nil
}
return nil, fmt.Errorf("non-existent key ID")
}
// Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages.
-func (w *Whisper) Subscribe(f *Filter) (string, error) {
- return w.filters.Install(f)
+func (whisper *Whisper) Subscribe(f *Filter) (string, error) {
+ s, err := whisper.filters.Install(f)
+ if err == nil {
+ whisper.updateBloomFilter(f)
+ }
+ return s, err
+}
+
+// updateBloomFilter recalculates the new value of bloom filter,
+// and informs the peers if necessary.
+func (whisper *Whisper) updateBloomFilter(f *Filter) {
+ aggregate := make([]byte, bloomFilterSize)
+ for _, t := range f.Topics {
+ top := BytesToTopic(t)
+ b := TopicToBloom(top)
+ aggregate = addBloom(aggregate, b)
+ }
+
+ if !bloomFilterMatch(whisper.BloomFilter(), aggregate) {
+ // existing bloom filter must be updated
+ aggregate = addBloom(whisper.BloomFilter(), aggregate)
+ whisper.SetBloomFilter(aggregate)
+ }
}
// GetFilter returns the filter by id.
-func (w *Whisper) GetFilter(id string) *Filter {
- return w.filters.Get(id)
+func (whisper *Whisper) GetFilter(id string) *Filter {
+ return whisper.filters.Get(id)
}
// Unsubscribe removes an installed message handler.
-func (w *Whisper) Unsubscribe(id string) error {
- ok := w.filters.Uninstall(id)
+func (whisper *Whisper) Unsubscribe(id string) error {
+ ok := whisper.filters.Uninstall(id)
if !ok {
return fmt.Errorf("Unsubscribe: Invalid ID")
}
@@ -478,8 +586,8 @@ func (w *Whisper) Unsubscribe(id string) error {
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
-func (w *Whisper) Send(envelope *Envelope) error {
- ok, err := w.add(envelope)
+func (whisper *Whisper) Send(envelope *Envelope) error {
+ ok, err := whisper.add(envelope)
if err != nil {
return err
}
@@ -491,13 +599,13 @@ func (w *Whisper) Send(envelope *Envelope) error {
// Start implements node.Service, starting the background data propagation thread
// of the Whisper protocol.
-func (w *Whisper) Start(*p2p.Server) error {
+func (whisper *Whisper) Start(*p2p.Server) error {
log.Info("started whisper v." + ProtocolVersionStr)
- go w.update()
+ go whisper.update()
numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ {
- go w.processQueue()
+ go whisper.processQueue()
}
return nil
@@ -505,26 +613,26 @@ func (w *Whisper) Start(*p2p.Server) error {
// Stop implements node.Service, stopping the background data propagation thread
// of the Whisper protocol.
-func (w *Whisper) Stop() error {
- close(w.quit)
+func (whisper *Whisper) Stop() error {
+ close(whisper.quit)
log.Info("whisper stopped")
return nil
}
// HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
// connection is negotiated.
-func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
// Create the new peer and start tracking it
- whisperPeer := newPeer(wh, peer, rw)
+ whisperPeer := newPeer(whisper, peer, rw)
- wh.peerMu.Lock()
- wh.peers[whisperPeer] = struct{}{}
- wh.peerMu.Unlock()
+ whisper.peerMu.Lock()
+ whisper.peers[whisperPeer] = struct{}{}
+ whisper.peerMu.Unlock()
defer func() {
- wh.peerMu.Lock()
- delete(wh.peers, whisperPeer)
- wh.peerMu.Unlock()
+ whisper.peerMu.Lock()
+ delete(whisper.peers, whisperPeer)
+ whisper.peerMu.Unlock()
}()
// Run the peer handshake and state updates
@@ -534,11 +642,11 @@ func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
whisperPeer.start()
defer whisperPeer.stop()
- return wh.runMessageLoop(whisperPeer, rw)
+ return whisper.runMessageLoop(whisperPeer, rw)
}
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
-func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
+func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
for {
// fetch the next packet
packet, err := rw.ReadMsg()
@@ -546,7 +654,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("message loop", "peer", p.peer.ID(), "err", err)
return err
}
- if packet.Size > wh.MaxMessageSize() {
+ if packet.Size > whisper.MaxMessageSize() {
log.Warn("oversized message received", "peer", p.peer.ID())
return errors.New("oversized message received")
}
@@ -565,7 +673,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
trouble := false
for _, env := range envelopes {
- cached, err := wh.add(env)
+ cached, err := whisper.add(env)
if err != nil {
trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
@@ -592,7 +700,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
p.powRequirement = f
case bloomFilterExCode:
- // to be implemented
+ var bloom []byte
+ err := packet.Decode(&bloom)
+ if err == nil && len(bloom) != bloomFilterSize {
+ err = fmt.Errorf("wrong bloom filter size %d", len(bloom))
+ }
+
+ if err != nil {
+ log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid bloom filter exchange message")
+ }
+ p.setBloomFilter(bloom)
case p2pMessageCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and
@@ -604,17 +722,17 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid direct message")
}
- wh.postEvent(&envelope, true)
+ whisper.postEvent(&envelope, true)
}
case p2pRequestCode:
// Must be processed if mail server is implemented. Otherwise ignore.
- if wh.mailServer != nil {
+ if whisper.mailServer != nil {
var request Envelope
if err := packet.Decode(&request); err != nil {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid p2p request")
}
- wh.mailServer.DeliverMail(p, &request)
+ whisper.mailServer.DeliverMail(p, &request)
}
default:
// New message types might be implemented in the future versions of Whisper.
@@ -628,117 +746,126 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
-func (wh *Whisper) add(envelope *Envelope) (bool, error) {
+func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
now := uint32(time.Now().Unix())
sent := envelope.Expiry - envelope.TTL
if sent > now {
- if sent-SynchAllowance > now {
+ if sent-DefaultSyncAllowance > now {
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
- } else {
- // recalculate PoW, adjusted for the time difference, plus one second for latency
- envelope.calculatePoW(sent - now + 1)
}
+ // recalculate PoW, adjusted for the time difference, plus one second for latency
+ envelope.calculatePoW(sent - now + 1)
}
if envelope.Expiry < now {
- if envelope.Expiry+SynchAllowance*2 < now {
+ if envelope.Expiry+DefaultSyncAllowance*2 < now {
return false, fmt.Errorf("very old message")
- } else {
- log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
- return false, nil // drop envelope without error
}
+ log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
+ return false, nil // drop envelope without error
}
- if uint32(envelope.size()) > wh.MaxMessageSize() {
+ if uint32(envelope.size()) > whisper.MaxMessageSize() {
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
}
- if envelope.PoW() < wh.MinPow() {
- log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
- return false, nil // drop envelope without error for now
+ if envelope.PoW() < whisper.MinPow() {
+ // maybe the value was recently changed, and the peers did not adjust yet.
+ // in this case the previous value is retrieved by MinPowTolerance()
+ // for a short period of peer synchronization.
+ if envelope.PoW() < whisper.MinPowTolerance() {
+ return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
+ }
+ }
- // once the status message includes the PoW requirement, an error should be returned here:
- //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
+ if !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
+ // maybe the value was recently changed, and the peers did not adjust yet.
+ // in this case the previous value is retrieved by BloomFilterTolerance()
+ // for a short period of peer synchronization.
+ if !bloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
+ return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
+ envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
+ }
}
hash := envelope.Hash()
- wh.poolMu.Lock()
- _, alreadyCached := wh.envelopes[hash]
+ whisper.poolMu.Lock()
+ _, alreadyCached := whisper.envelopes[hash]
if !alreadyCached {
- wh.envelopes[hash] = envelope
- if wh.expirations[envelope.Expiry] == nil {
- wh.expirations[envelope.Expiry] = set.NewNonTS()
+ whisper.envelopes[hash] = envelope
+ if whisper.expirations[envelope.Expiry] == nil {
+ whisper.expirations[envelope.Expiry] = set.NewNonTS()
}
- if !wh.expirations[envelope.Expiry].Has(hash) {
- wh.expirations[envelope.Expiry].Add(hash)
+ if !whisper.expirations[envelope.Expiry].Has(hash) {
+ whisper.expirations[envelope.Expiry].Add(hash)
}
}
- wh.poolMu.Unlock()
+ whisper.poolMu.Unlock()
if alreadyCached {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
} else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
- wh.statsMu.Lock()
- wh.stats.memoryUsed += envelope.size()
- wh.statsMu.Unlock()
- wh.postEvent(envelope, false) // notify the local node about the new message
- if wh.mailServer != nil {
- wh.mailServer.Archive(envelope)
+ whisper.statsMu.Lock()
+ whisper.stats.memoryUsed += envelope.size()
+ whisper.statsMu.Unlock()
+ whisper.postEvent(envelope, false) // notify the local node about the new message
+ if whisper.mailServer != nil {
+ whisper.mailServer.Archive(envelope)
}
}
return true, nil
}
// postEvent queues the message for further processing.
-func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
+func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
if isP2P {
- w.p2pMsgQueue <- envelope
+ whisper.p2pMsgQueue <- envelope
} else {
- w.checkOverflow()
- w.messageQueue <- envelope
+ whisper.checkOverflow()
+ whisper.messageQueue <- envelope
}
}
// checkOverflow checks if message queue overflow occurs and reports it if necessary.
-func (w *Whisper) checkOverflow() {
- queueSize := len(w.messageQueue)
+func (whisper *Whisper) checkOverflow() {
+ queueSize := len(whisper.messageQueue)
if queueSize == messageQueueLimit {
- if !w.Overflow() {
- w.settings.Store(overflowIdx, true)
+ if !whisper.Overflow() {
+ whisper.settings.Store(overflowIdx, true)
log.Warn("message queue overflow")
}
} else if queueSize <= messageQueueLimit/2 {
- if w.Overflow() {
- w.settings.Store(overflowIdx, false)
+ if whisper.Overflow() {
+ whisper.settings.Store(overflowIdx, false)
log.Warn("message queue overflow fixed (back to normal)")
}
}
}
// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
-func (w *Whisper) processQueue() {
+func (whisper *Whisper) processQueue() {
var e *Envelope
for {
select {
- case <-w.quit:
+ case <-whisper.quit:
return
- case e = <-w.messageQueue:
- w.filters.NotifyWatchers(e, false)
+ case e = <-whisper.messageQueue:
+ whisper.filters.NotifyWatchers(e, false)
- case e = <-w.p2pMsgQueue:
- w.filters.NotifyWatchers(e, true)
+ case e = <-whisper.p2pMsgQueue:
+ whisper.filters.NotifyWatchers(e, true)
}
}
}
// update loops until the lifetime of the whisper node, updating its internal
// state by expiring stale messages from the pool.
-func (w *Whisper) update() {
+func (whisper *Whisper) update() {
// Start a ticker to check for expirations
expire := time.NewTicker(expirationCycle)
@@ -746,9 +873,9 @@ func (w *Whisper) update() {
for {
select {
case <-expire.C:
- w.expire()
+ whisper.expire()
- case <-w.quit:
+ case <-whisper.quit:
return
}
}
@@ -756,46 +883,46 @@ func (w *Whisper) update() {
// expire iterates over all the expiration timestamps, removing all stale
// messages from the pools.
-func (w *Whisper) expire() {
- w.poolMu.Lock()
- defer w.poolMu.Unlock()
+func (whisper *Whisper) expire() {
+ whisper.poolMu.Lock()
+ defer whisper.poolMu.Unlock()
- w.statsMu.Lock()
- defer w.statsMu.Unlock()
- w.stats.reset()
+ whisper.statsMu.Lock()
+ defer whisper.statsMu.Unlock()
+ whisper.stats.reset()
now := uint32(time.Now().Unix())
- for expiry, hashSet := range w.expirations {
+ for expiry, hashSet := range whisper.expirations {
if expiry < now {
// Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool {
- sz := w.envelopes[v.(common.Hash)].size()
- delete(w.envelopes, v.(common.Hash))
- w.stats.messagesCleared++
- w.stats.memoryCleared += sz
- w.stats.memoryUsed -= sz
+ sz := whisper.envelopes[v.(common.Hash)].size()
+ delete(whisper.envelopes, v.(common.Hash))
+ whisper.stats.messagesCleared++
+ whisper.stats.memoryCleared += sz
+ whisper.stats.memoryUsed -= sz
return true
})
- w.expirations[expiry].Clear()
- delete(w.expirations, expiry)
+ whisper.expirations[expiry].Clear()
+ delete(whisper.expirations, expiry)
}
}
}
// Stats returns the whisper node statistics.
-func (w *Whisper) Stats() Statistics {
- w.statsMu.Lock()
- defer w.statsMu.Unlock()
+func (whisper *Whisper) Stats() Statistics {
+ whisper.statsMu.Lock()
+ defer whisper.statsMu.Unlock()
- return w.stats
+ return whisper.stats
}
// Envelopes retrieves all the messages currently pooled by the node.
-func (w *Whisper) Envelopes() []*Envelope {
- w.poolMu.RLock()
- defer w.poolMu.RUnlock()
+func (whisper *Whisper) Envelopes() []*Envelope {
+ whisper.poolMu.RLock()
+ defer whisper.poolMu.RUnlock()
- all := make([]*Envelope, 0, len(w.envelopes))
- for _, envelope := range w.envelopes {
+ all := make([]*Envelope, 0, len(whisper.envelopes))
+ for _, envelope := range whisper.envelopes {
all = append(all, envelope)
}
return all
@@ -803,13 +930,13 @@ func (w *Whisper) Envelopes() []*Envelope {
// Messages iterates through all currently floating envelopes
// and retrieves all the messages, that this filter could decrypt.
-func (w *Whisper) Messages(id string) []*ReceivedMessage {
+func (whisper *Whisper) Messages(id string) []*ReceivedMessage {
result := make([]*ReceivedMessage, 0)
- w.poolMu.RLock()
- defer w.poolMu.RUnlock()
+ whisper.poolMu.RLock()
+ defer whisper.poolMu.RUnlock()
- if filter := w.filters.Get(id); filter != nil {
- for _, env := range w.envelopes {
+ if filter := whisper.filters.Get(id); filter != nil {
+ for _, env := range whisper.envelopes {
msg := filter.processEnvelope(env)
if msg != nil {
result = append(result, msg)
@@ -820,11 +947,11 @@ func (w *Whisper) Messages(id string) []*ReceivedMessage {
}
// isEnvelopeCached checks if envelope with specific hash has already been received and cached.
-func (w *Whisper) isEnvelopeCached(hash common.Hash) bool {
- w.poolMu.Lock()
- defer w.poolMu.Unlock()
+func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool {
+ whisper.poolMu.Lock()
+ defer whisper.poolMu.Unlock()
- _, exist := w.envelopes[hash]
+ _, exist := whisper.envelopes[hash]
return exist
}
@@ -850,9 +977,16 @@ func validatePrivateKey(k *ecdsa.PrivateKey) bool {
return ValidatePublicKey(&k.PublicKey)
}
-// validateSymmetricKey returns false if the key contains all zeros
-func validateSymmetricKey(k []byte) bool {
- return len(k) > 0 && !containsOnlyZeros(k)
+// validateDataIntegrity returns false if the data have the wrong or contains all zeros,
+// which is the simplest and the most common bug.
+func validateDataIntegrity(k []byte, expectedSize int) bool {
+ if len(k) != expectedSize {
+ return false
+ }
+ if expectedSize > 3 && containsOnlyZeros(k) {
+ return false
+ }
+ return true
}
// containsOnlyZeros checks if the data contain only zeros.
@@ -886,14 +1020,49 @@ func BytesToUintBigEndian(b []byte) (res uint64) {
// GenerateRandomID generates a random string, which is then returned to be used as a key id
func GenerateRandomID() (id string, err error) {
- buf := make([]byte, keyIdSize)
- _, err = crand.Read(buf)
+ buf, err := generateSecureRandomData(keyIDSize)
if err != nil {
return "", err
}
- if !validateSymmetricKey(buf) {
+ if !validateDataIntegrity(buf, keyIDSize) {
return "", fmt.Errorf("error in generateRandomID: crypto/rand failed to generate random data")
}
id = common.Bytes2Hex(buf)
return id, err
}
+
+func isFullNode(bloom []byte) bool {
+ if bloom == nil {
+ return true
+ }
+ for _, b := range bloom {
+ if b != 255 {
+ return false
+ }
+ }
+ return true
+}
+
+func bloomFilterMatch(filter, sample []byte) bool {
+ if filter == nil {
+ return true
+ }
+
+ for i := 0; i < bloomFilterSize; i++ {
+ f := filter[i]
+ s := sample[i]
+ if (f | s) != f {
+ return false
+ }
+ }
+
+ return true
+}
+
+func addBloom(a, b []byte) []byte {
+ c := make([]byte, bloomFilterSize)
+ for i := 0; i < bloomFilterSize; i++ {
+ c[i] = a[i] | b[i]
+ }
+ return c
+}
diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go
index b391a1161..99e5f0bbb 100644
--- a/whisper/whisperv6/whisper_test.go
+++ b/whisper/whisperv6/whisper_test.go
@@ -81,7 +81,7 @@ func TestWhisperBasic(t *testing.T) {
}
derived := pbkdf2.Key([]byte(peerID), nil, 65356, aesKeyLength, sha256.New)
- if !validateSymmetricKey(derived) {
+ if !validateDataIntegrity(derived, aesKeyLength) {
t.Fatalf("failed validateSymmetricKey with param = %v.", derived)
}
if containsOnlyZeros(derived) {
@@ -448,24 +448,12 @@ func TestWhisperSymKeyManagement(t *testing.T) {
if !w.HasSymKey(id2) {
t.Fatalf("HasSymKey(id2) failed.")
}
- if k1 == nil {
- t.Fatalf("k1 does not exist.")
- }
- if k2 == nil {
- t.Fatalf("k2 does not exist.")
+ if !validateDataIntegrity(k2, aesKeyLength) {
+ t.Fatalf("key validation failed.")
}
if !bytes.Equal(k1, k2) {
t.Fatalf("k1 != k2.")
}
- if len(k1) != aesKeyLength {
- t.Fatalf("wrong length of k1.")
- }
- if len(k2) != aesKeyLength {
- t.Fatalf("wrong length of k2.")
- }
- if !validateSymmetricKey(k2) {
- t.Fatalf("key validation failed.")
- }
}
func TestExpiry(t *testing.T) {
@@ -843,3 +831,64 @@ func TestSymmetricSendKeyMismatch(t *testing.T) {
t.Fatalf("received a message when keys weren't matching")
}
}
+
+func TestBloom(t *testing.T) {
+ topic := TopicType{0, 0, 255, 6}
+ b := TopicToBloom(topic)
+ x := make([]byte, bloomFilterSize)
+ x[0] = byte(1)
+ x[32] = byte(1)
+ x[bloomFilterSize-1] = byte(128)
+ if !bloomFilterMatch(x, b) || !bloomFilterMatch(b, x) {
+ t.Fatalf("bloom filter does not match the mask")
+ }
+
+ _, err := mrand.Read(b)
+ if err != nil {
+ t.Fatalf("math rand error")
+ }
+ _, err = mrand.Read(x)
+ if err != nil {
+ t.Fatalf("math rand error")
+ }
+ if !bloomFilterMatch(b, b) {
+ t.Fatalf("bloom filter does not match self")
+ }
+ x = addBloom(x, b)
+ if !bloomFilterMatch(x, b) {
+ t.Fatalf("bloom filter does not match combined bloom")
+ }
+ if !isFullNode(nil) {
+ t.Fatalf("isFullNode did not recognize nil as full node")
+ }
+ x[17] = 254
+ if isFullNode(x) {
+ t.Fatalf("isFullNode false positive")
+ }
+ for i := 0; i < bloomFilterSize; i++ {
+ b[i] = byte(255)
+ }
+ if !isFullNode(b) {
+ t.Fatalf("isFullNode false negative")
+ }
+ if bloomFilterMatch(x, b) {
+ t.Fatalf("bloomFilterMatch false positive")
+ }
+ if !bloomFilterMatch(b, x) {
+ t.Fatalf("bloomFilterMatch false negative")
+ }
+
+ w := New(&DefaultConfig)
+ f := w.BloomFilter()
+ if f != nil {
+ t.Fatalf("wrong bloom on creation")
+ }
+ err = w.SetBloomFilter(x)
+ if err != nil {
+ t.Fatalf("failed to set bloom filter: %s", err)
+ }
+ f = w.BloomFilter()
+ if !bloomFilterMatch(f, x) || !bloomFilterMatch(x, f) {
+ t.Fatalf("retireved wrong bloom filter")
+ }
+}