diff options
Diffstat (limited to 'whisper/whisperv5/api.go')
-rw-r--r-- | whisper/whisperv5/api.go | 795 |
1 files changed, 438 insertions, 357 deletions
diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go index 1a4e4d879..fd25a4206 100644 --- a/whisper/whisperv5/api.go +++ b/whisper/whisperv5/api.go @@ -17,494 +17,575 @@ package whisperv5 import ( - "encoding/json" + "context" "errors" "fmt" + "time" + "crypto/ecdsa" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" ) -var whisperOfflineErr = errors.New("whisper is offline") +const ( + filterTimeout = 300 // filters are considered timeout out after filterTimeout seconds +) + +var ( + SymAsymErr = errors.New("specify either a symetric or a asymmetric key") + InvalidSymmetricKeyErr = errors.New("invalid symmetric key") + InvalidPublicKeyErr = errors.New("invalid public key") + InvalidSigningPubKey = errors.New("invalid signing public key") + TooLowPoWErr = errors.New("message rejected, PoW too low") + NoTopicsErr = errors.New("missing topic(s)") +) -// PublicWhisperAPI provides the whisper RPC service. +// PublicWhisperAPI provides the whisper RPC service that can be +// use publicly without security implications. type PublicWhisperAPI struct { - whisper *Whisper + w *Whisper + + mu sync.Mutex + lastUsed map[string]time.Time // keeps track when a filter was polled for the last time. } // NewPublicWhisperAPI create a new RPC whisper service. func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI { - return &PublicWhisperAPI{whisper: w} + api := &PublicWhisperAPI{ + w: w, + lastUsed: make(map[string]time.Time), + } + + go api.run() + return api } -// Start starts the Whisper worker threads. -func (api *PublicWhisperAPI) Start() error { - if api.whisper == nil { - return whisperOfflineErr +// run the api event loop. +// this loop deletes filter that have not been used within filterTimeout +func (api *PublicWhisperAPI) run() { + timeout := time.NewTicker(2 * time.Minute) + for { + <-timeout.C + + api.mu.Lock() + for id, lastUsed := range api.lastUsed { + if time.Since(lastUsed).Seconds() >= filterTimeout { + delete(api.lastUsed, id) + if err := api.w.Unsubscribe(id); err != nil { + log.Error("could not unsubscribe whisper filter", "error", err) + } + log.Debug("delete whisper filter (timeout)", "id", id) + } + } + api.mu.Unlock() } - return api.whisper.Start(nil) } -// Stop stops the Whisper worker threads. -func (api *PublicWhisperAPI) Stop() error { - if api.whisper == nil { - return whisperOfflineErr - } - return api.whisper.Stop() +// Version returns the Whisper sub-protocol version. +func (api *PublicWhisperAPI) Version(ctx context.Context) string { + return ProtocolVersionStr } -// Version returns the Whisper version this node offers. -func (api *PublicWhisperAPI) Version() (hexutil.Uint, error) { - if api.whisper == nil { - return 0, whisperOfflineErr - } - return hexutil.Uint(api.whisper.Version()), nil +// Info contains diagnostic information. +type Info struct { + Memory int `json:"memory"` // Memory size of the floating messages in bytes. + Message int `json:"message"` // Number of floating messages. + MinPow float64 `json:"minPow"` // Minimal accepted PoW + MaxMessageSize uint32 `json:"maxMessageSize"` // Maximum accepted message size } -// Info returns the Whisper statistics for diagnostics. -func (api *PublicWhisperAPI) Info() (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr +// Info returns diagnostic information about the whisper node. +func (api *PublicWhisperAPI) Info(ctx context.Context) Info { + stats := api.w.Stats() + return Info{ + Memory: stats.memoryUsed, + Message: len(api.w.messageQueue) + len(api.w.p2pMsgQueue), + MinPow: api.w.MinPow(), + MaxMessageSize: api.w.MaxMessageSize(), } - return api.whisper.Stats(), nil } -// SetMaxMessageLength sets the maximal message length allowed by this node -func (api *PublicWhisperAPI) SetMaxMessageLength(val int) error { - if api.whisper == nil { - return whisperOfflineErr - } - return api.whisper.SetMaxMessageLength(val) +// SetMaxMessageSize sets the maximum message size that is accepted. +// Upper limit is defined in whisperv5.MaxMessageSize. +func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32) (bool, error) { + return true, api.w.SetMaxMessageSize(size) } -// SetMinimumPoW sets the minimal PoW required by this node -func (api *PublicWhisperAPI) SetMinimumPoW(val float64) error { - if api.whisper == nil { - return whisperOfflineErr - } - return api.whisper.SetMinimumPoW(val) +// SetMinPow sets the minimum PoW for a message before it is accepted. +func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) { + return true, api.w.SetMinimumPoW(pow) } -// AllowP2PMessagesFromPeer marks specific peer trusted, which will allow it -// to send historic (expired) messages. -func (api *PublicWhisperAPI) AllowP2PMessagesFromPeer(enode string) error { - if api.whisper == nil { - return whisperOfflineErr - } +// MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages. +// Note: This function is not adding new nodes, the node needs to exists as a peer. +func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) { n, err := discover.ParseNode(enode) if err != nil { - return errors.New("failed to parse enode of trusted peer: " + err.Error()) + return false, err } - return api.whisper.AllowP2PMessagesFromPeer(n.ID[:]) + return true, api.w.AllowP2PMessagesFromPeer(n.ID[:]) } -// HasKeyPair checks if the whisper node is configured with the private key -// of the specified public pair. -func (api *PublicWhisperAPI) HasKeyPair(id string) (bool, error) { - if api.whisper == nil { - return false, whisperOfflineErr - } - return api.whisper.HasKeyPair(id), nil +// NewKeyPair generates a new public and private key pair for message decryption and encryption. +// It returns an ID that can be used to refer to the keypair. +func (api *PublicWhisperAPI) NewKeyPair(ctx context.Context) (string, error) { + return api.w.NewKeyPair() } -// DeleteKeyPair deletes the specifies key if it exists. -func (api *PublicWhisperAPI) DeleteKeyPair(id string) (bool, error) { - if api.whisper == nil { - return false, whisperOfflineErr +// AddPrivateKey imports the given private key. +func (api *PublicWhisperAPI) AddPrivateKey(ctx context.Context, privateKey hexutil.Bytes) (string, error) { + key, err := crypto.ToECDSA(privateKey) + if err != nil { + return "", err } - return api.whisper.DeleteKeyPair(id), nil + return api.w.AddKeyPair(key) } -// NewKeyPair generates a new cryptographic identity for the client, and injects -// it into the known identities for message decryption. -func (api *PublicWhisperAPI) NewKeyPair() (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr +// DeleteKeyPair removes the key with the given key if it exists. +func (api *PublicWhisperAPI) DeleteKeyPair(ctx context.Context, key string) (bool, error) { + if ok := api.w.DeleteKeyPair(key); ok { + return true, nil } - return api.whisper.NewKeyPair() + return false, fmt.Errorf("key pair %s not found", key) } -// GetPublicKey returns the public key for identity id -func (api *PublicWhisperAPI) GetPublicKey(id string) (hexutil.Bytes, error) { - if api.whisper == nil { - return nil, whisperOfflineErr - } - key, err := api.whisper.GetPrivateKey(id) +// HasKeyPair returns an indication if the node has a key pair that is associated with the given id. +func (api *PublicWhisperAPI) HasKeyPair(ctx context.Context, id string) bool { + return api.w.HasKeyPair(id) +} + +// GetPublicKey returns the public key associated with the given key. The key is the hex +// encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62. +func (api *PublicWhisperAPI) GetPublicKey(ctx context.Context, id string) (hexutil.Bytes, error) { + key, err := api.w.GetPrivateKey(id) if err != nil { - return nil, err + return hexutil.Bytes{}, err } return crypto.FromECDSAPub(&key.PublicKey), nil } -// GetPrivateKey returns the private key for identity id -func (api *PublicWhisperAPI) GetPrivateKey(id string) (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr - } - key, err := api.whisper.GetPrivateKey(id) +// GetPublicKey returns the private key associated with the given key. The key is the hex +// encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62. +func (api *PublicWhisperAPI) GetPrivateKey(ctx context.Context, id string) (hexutil.Bytes, error) { + key, err := api.w.GetPrivateKey(id) if err != nil { - return "", err + return hexutil.Bytes{}, err } - return common.ToHex(crypto.FromECDSA(key)), nil + return crypto.FromECDSA(key), nil } -// GenerateSymmetricKey generates a random symmetric key and stores it under id, -// which is then returned. Will be used in the future for session key exchange. -func (api *PublicWhisperAPI) GenerateSymmetricKey() (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr - } - return api.whisper.GenerateSymKey() +// NewSymKey generate a random symmetric key. +// It returns an ID that can be used to refer to the key. +// Can be used encrypting and decrypting messages where the key is known to both parties. +func (api *PublicWhisperAPI) NewSymKey(ctx context.Context) (string, error) { + return api.w.GenerateSymKey() } -// AddSymmetricKeyDirect stores the key, and returns its id. -func (api *PublicWhisperAPI) AddSymmetricKeyDirect(key hexutil.Bytes) (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr - } - return api.whisper.AddSymKeyDirect(key) +// AddSymKey import a symmetric key. +// It returns an ID that can be used to refer to the key. +// Can be used encrypting and decrypting messages where the key is known to both parties. +func (api *PublicWhisperAPI) AddSymKey(ctx context.Context, key hexutil.Bytes) (string, error) { + return api.w.AddSymKeyDirect([]byte(key)) } -// AddSymmetricKeyFromPassword generates the key from password, stores it, and returns its id. -func (api *PublicWhisperAPI) AddSymmetricKeyFromPassword(password string) (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr - } - return api.whisper.AddSymKeyFromPassword(password) +// GenerateSymKeyFromPassword derive a key from the given password, stores it, and returns its ID. +func (api *PublicWhisperAPI) GenerateSymKeyFromPassword(ctx context.Context, passwd string) (string, error) { + return api.w.AddSymKeyFromPassword(passwd) } -// HasSymmetricKey returns true if there is a key associated with the given id. -// Otherwise returns false. -func (api *PublicWhisperAPI) HasSymmetricKey(id string) (bool, error) { - if api.whisper == nil { - return false, whisperOfflineErr - } - res := api.whisper.HasSymKey(id) - return res, nil +// HasSymKey returns an indication if the node has a symmetric key associated with the given key. +func (api *PublicWhisperAPI) HasSymKey(ctx context.Context, id string) bool { + return api.w.HasSymKey(id) } -// GetSymmetricKey returns the symmetric key associated with the given id. -func (api *PublicWhisperAPI) GetSymmetricKey(name string) (hexutil.Bytes, error) { - if api.whisper == nil { - return nil, whisperOfflineErr - } - b, err := api.whisper.GetSymKey(name) - if err != nil { - return nil, err - } - return b, nil +// GetSymKey returns the symmetric key associated with the given id. +func (api *PublicWhisperAPI) GetSymKey(ctx context.Context, id string) (hexutil.Bytes, error) { + return api.w.GetSymKey(id) } -// DeleteSymmetricKey deletes the key associated with the name string if it exists. -func (api *PublicWhisperAPI) DeleteSymmetricKey(name string) (bool, error) { - if api.whisper == nil { - return false, whisperOfflineErr - } - res := api.whisper.DeleteSymKey(name) - return res, nil +// DeleteSymKey deletes the symmetric key that is associated with the given id. +func (api *PublicWhisperAPI) DeleteSymKey(ctx context.Context, id string) bool { + return api.w.DeleteSymKey(id) } -// Subscribe creates and registers a new filter to watch for inbound whisper messages. -// Returns the ID of the newly created filter. -func (api *PublicWhisperAPI) Subscribe(args WhisperFilterArgs) (string, error) { - if api.whisper == nil { - return "", whisperOfflineErr - } +//go:generate gencodec -type NewMessage -field-override newMessageOverride -out gen_newmessage_json.go + +// NewMessage represents a new whisper message that is posted through the RPC. +type NewMessage struct { + SymKeyID string `json:"symKeyID"` + PublicKey []byte `json:"pubKey"` + Sig string `json:"sig"` + TTL uint32 `json:"ttl"` + Topic TopicType `json:"topic"` + Payload []byte `json:"payload"` + Padding []byte `json:"padding"` + PowTime uint32 `json:"powTime"` + PowTarget float64 `json:"powTarget"` + TargetPeer string `json:"targetPeer"` +} - filter := Filter{ - PoW: args.MinPoW, - Messages: make(map[common.Hash]*ReceivedMessage), - AllowP2P: args.AllowP2P, - } +type newMessageOverride struct { + PublicKey hexutil.Bytes + Payload hexutil.Bytes + Padding hexutil.Bytes +} - var err error - for i, bt := range args.Topics { - if len(bt) == 0 || len(bt) > 4 { - return "", errors.New(fmt.Sprintf("subscribe: topic %d has wrong size: %d", i, len(bt))) - } - filter.Topics = append(filter.Topics, bt) +// Post a message on the Whisper network. +func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) { + var ( + symKeyGiven = len(req.SymKeyID) > 0 + pubKeyGiven = len(req.PublicKey) > 0 + err error + ) + + // user must specify either a symmetric or a asymmetric key + if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { + return false, SymAsymErr } - if err = ValidateKeyID(args.Key); err != nil { - return "", errors.New("subscribe: " + err.Error()) + params := &MessageParams{ + TTL: req.TTL, + Payload: req.Payload, + Padding: req.Padding, + WorkTime: req.PowTime, + PoW: req.PowTarget, + Topic: req.Topic, } - if len(args.Sig) > 0 { - sb := common.FromHex(args.Sig) - if sb == nil { - return "", errors.New("subscribe: sig parameter is invalid") - } - filter.Src = crypto.ToECDSAPub(sb) - if !ValidatePublicKey(filter.Src) { - return "", errors.New("subscribe: invalid 'sig' field") + // Set key that is used to sign the message + if len(req.Sig) > 0 { + if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil { + return false, err } } - if args.Symmetric { - if len(args.Topics) == 0 { - return "", errors.New("subscribe: at least one topic must be specified with symmetric encryption") + // Set symmetric key that is used to encrypt the message + if symKeyGiven { + if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption + return false, NoTopicsErr } - symKey, err := api.whisper.GetSymKey(args.Key) - if err != nil { - return "", errors.New("subscribe: invalid key ID") + if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { + return false, err } - if !validateSymmetricKey(symKey) { - return "", errors.New("subscribe: retrieved key is invalid") - } - filter.KeySym = symKey - filter.SymKeyHash = crypto.Keccak256Hash(filter.KeySym) - } else { - filter.KeyAsym, err = api.whisper.GetPrivateKey(args.Key) - if err != nil { - return "", errors.New("subscribe: invalid key ID") + if !validateSymmetricKey(params.KeySym) { + return false, InvalidSymmetricKeyErr } - if filter.KeyAsym == nil { - return "", errors.New("subscribe: non-existent identity provided") + } + + // Set asymmetric key that is used to encrypt the message + if pubKeyGiven { + params.Dst = crypto.ToECDSAPub(req.PublicKey) + if !ValidatePublicKey(params.Dst) { + return false, InvalidPublicKeyErr } } - return api.whisper.Subscribe(&filter) -} + // encrypt and sent message + whisperMsg, err := NewSentMessage(params) + if err != nil { + return false, err + } -// Unsubscribe disables and removes an existing filter. -func (api *PublicWhisperAPI) Unsubscribe(id string) { - api.whisper.Unsubscribe(id) -} + env, err := whisperMsg.Wrap(params) + if err != nil { + return false, err + } + + // send to specific node (skip PoW check) + if len(req.TargetPeer) > 0 { + n, err := discover.ParseNode(req.TargetPeer) + if err != nil { + return false, fmt.Errorf("failed to parse target peer: %s", err) + } + return true, api.w.SendP2PMessage(n.ID[:], env) + } -// GetSubscriptionMessages retrieves all the new messages matched by the corresponding -// subscription filter since the last retrieval. -func (api *PublicWhisperAPI) GetNewSubscriptionMessages(id string) []*WhisperMessage { - f := api.whisper.GetFilter(id) - if f != nil { - newMail := f.Retrieve() - return toWhisperMessages(newMail) + // ensure that the message PoW meets the node's minimum accepted PoW + if req.PowTarget < api.w.MinPow() { + return false, TooLowPoWErr } - return toWhisperMessages(nil) + + return true, api.w.Send(env) } -// GetMessages retrieves all the floating messages that match a specific subscription filter. -// It is likely to be called once per session, right after Subscribe call. -func (api *PublicWhisperAPI) GetFloatingMessages(id string) []*WhisperMessage { - all := api.whisper.Messages(id) - return toWhisperMessages(all) +//go:generate gencodec -type Criteria -field-override criteriaOverride -out gen_criteria_json.go + +// Criteria holds various filter options for inbound messages. +type Criteria struct { + SymKeyID string `json:"symKeyID"` + PrivateKeyID string `json:"privateKeyID"` + Sig []byte `json:"sig"` + MinPow float64 `json:"minPow"` + Topics []TopicType `json:"topics"` + AllowP2P bool `json:"allowP2P"` } -// toWhisperMessages converts a Whisper message to a RPC whisper message. -func toWhisperMessages(messages []*ReceivedMessage) []*WhisperMessage { - msgs := make([]*WhisperMessage, len(messages)) - for i, msg := range messages { - msgs[i] = NewWhisperMessage(msg) - } - return msgs +type criteriaOverride struct { + Sig hexutil.Bytes } -// Post creates a whisper message and injects it into the network for distribution. -func (api *PublicWhisperAPI) Post(args PostArgs) error { - if api.whisper == nil { - return whisperOfflineErr +// Messages set up a subscription that fires events when messages arrive that match +// the given set of criteria. +func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Subscription, error) { + var ( + symKeyGiven = len(crit.SymKeyID) > 0 + pubKeyGiven = len(crit.PrivateKeyID) > 0 + err error + ) + + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported } - var err error - params := MessageParams{ - TTL: args.TTL, - WorkTime: args.PowTime, - PoW: args.PowTarget, - Payload: args.Payload, - Padding: args.Padding, + // user must specify either a symmetric or a asymmetric key + if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { + return nil, SymAsymErr } - if len(args.Key) == 0 { - return errors.New("post: key is missing") + filter := Filter{ + PoW: crit.MinPow, + Messages: make(map[common.Hash]*ReceivedMessage), + AllowP2P: crit.AllowP2P, } - if len(args.Sig) > 0 { - params.Src, err = api.whisper.GetPrivateKey(args.Sig) - if err != nil { - return err - } - if params.Src == nil { - return errors.New("post: empty identity") + if len(crit.Sig) > 0 { + filter.Src = crypto.ToECDSAPub(crit.Sig) + if !ValidatePublicKey(filter.Src) { + return nil, InvalidSigningPubKey } } - if len(args.Topic) == TopicLength { - params.Topic = BytesToTopic(args.Topic) - } else if len(args.Topic) != 0 { - return errors.New(fmt.Sprintf("post: wrong topic size %d", len(args.Topic))) + for i, bt := range crit.Topics { + if len(bt) == 0 || len(bt) > 4 { + return nil, fmt.Errorf("subscribe: topic %d has wrong size: %d", i, len(bt)) + } + filter.Topics = append(filter.Topics, bt[:]) } - if args.Type == "sym" { - if err = ValidateKeyID(args.Key); err != nil { - return err + // listen for message that are encrypted with the given symmetric key + if symKeyGiven { + if len(filter.Topics) == 0 { + return nil, NoTopicsErr } - params.KeySym, err = api.whisper.GetSymKey(args.Key) + key, err := api.w.GetSymKey(crit.SymKeyID) if err != nil { - return err + return nil, err } - if !validateSymmetricKey(params.KeySym) { - return errors.New("post: key for symmetric encryption is invalid") - } - if len(params.Topic) == 0 { - return errors.New("post: topic is missing for symmetric encryption") + if !validateSymmetricKey(key) { + return nil, InvalidSymmetricKeyErr } - } else if args.Type == "asym" { - kb := common.FromHex(args.Key) - if kb == nil { - return errors.New("post: public key for asymmetric encryption is invalid") - } - params.Dst = crypto.ToECDSAPub(kb) - if !ValidatePublicKey(params.Dst) { - return errors.New("post: public key for asymmetric encryption is invalid") - } - } else { - return errors.New("post: wrong type (sym/asym)") + filter.KeySym = key + filter.SymKeyHash = crypto.Keccak256Hash(filter.KeySym) } - // encrypt and send - message, err := NewSentMessage(¶ms) - if err != nil { - return err + // listen for messages that are encrypted with the given public key + if pubKeyGiven { + filter.KeyAsym, err = api.w.GetPrivateKey(crit.PrivateKeyID) + if err != nil || filter.KeyAsym == nil { + return nil, InvalidPublicKeyErr + } } - envelope, err := message.Wrap(¶ms) + + id, err := api.w.Subscribe(&filter) if err != nil { - return err - } - if envelope.size() > api.whisper.maxMsgLength { - return errors.New("post: message is too big") + return nil, err } - if len(args.TargetPeer) != 0 { - n, err := discover.ParseNode(args.TargetPeer) - if err != nil { - return errors.New("post: failed to parse enode of target peer: " + err.Error()) - } - return api.whisper.SendP2PMessage(n.ID[:], envelope) - } else if args.PowTarget < api.whisper.minPoW { - return errors.New("post: target PoW is less than minimum PoW, the message can not be sent") - } - - return api.whisper.Send(envelope) -} - -type PostArgs struct { - Type string `json:"type"` // "sym"/"asym" (symmetric or asymmetric) - TTL uint32 `json:"ttl"` // time-to-live in seconds - Sig string `json:"sig"` // id of the signing key - Key string `json:"key"` // key id (in case of sym) or public key (in case of asym) - Topic hexutil.Bytes `json:"topic"` // topic (4 bytes) - Padding hexutil.Bytes `json:"padding"` // optional padding bytes - Payload hexutil.Bytes `json:"payload"` // payload to be encrypted - PowTime uint32 `json:"powTime"` // maximal time in seconds to be spent on PoW - PowTarget float64 `json:"powTarget"` // minimal PoW required for this message - TargetPeer string `json:"targetPeer"` // peer id (for p2p message only) -} - -type WhisperFilterArgs struct { - Symmetric bool // encryption type - Key string // id of the key to be used for decryption - Sig string // public key of the sender to be verified - MinPoW float64 // minimal PoW requirement - Topics [][]byte // list of topics (up to 4 bytes each) to match - AllowP2P bool // indicates wheather direct p2p messages are allowed for this filter -} - -// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a -// JSON message blob into a WhisperFilterArgs structure. -func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) { - // Unmarshal the JSON message and sanity check - var obj struct { - Type string `json:"type"` - Key string `json:"key"` - Sig string `json:"sig"` - MinPoW float64 `json:"minPoW"` - Topics []interface{} `json:"topics"` - AllowP2P bool `json:"allowP2P"` - } - if err := json.Unmarshal(b, &obj); err != nil { - return err - } - - switch obj.Type { - case "sym": - args.Symmetric = true - case "asym": - args.Symmetric = false - default: - return errors.New("wrong type (sym/asym)") - } - - args.Key = obj.Key - args.Sig = obj.Sig - args.MinPoW = obj.MinPoW - args.AllowP2P = obj.AllowP2P - - // Construct the topic array - if obj.Topics != nil { - topics := make([]string, len(obj.Topics)) - for i, field := range obj.Topics { - switch value := field.(type) { - case string: - topics[i] = value - case nil: - return fmt.Errorf("topic[%d] is empty", i) - default: - return fmt.Errorf("topic[%d] is not a string", i) - } - } - topicsDecoded := make([][]byte, len(topics)) - for j, s := range topics { - x := common.FromHex(s) - if x == nil || len(x) > TopicLength { - return fmt.Errorf("topic[%d] is invalid", j) + // create subscription and start waiting for message events + rpcSub := notifier.CreateSubscription() + go func() { + // for now poll internally, refactor whisper internal for channel support + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if filter := api.w.GetFilter(id); filter != nil { + for _, rpcMessage := range toMessage(filter.Retrieve()) { + if err := notifier.Notify(rpcSub.ID, rpcMessage); err != nil { + log.Error("Failed to send notification", "err", err) + } + } + } + case <-rpcSub.Err(): + api.w.Unsubscribe(id) + return + case <-notifier.Closed(): + api.w.Unsubscribe(id) + return } - topicsDecoded[j] = x } - args.Topics = topicsDecoded - } + }() - return nil + return rpcSub, nil } -// WhisperMessage is the RPC representation of a whisper message. -type WhisperMessage struct { - Topic string `json:"topic"` - Payload string `json:"payload"` - Padding string `json:"padding"` - Src string `json:"sig"` - Dst string `json:"recipientPublicKey"` - Timestamp uint32 `json:"timestamp"` - TTL uint32 `json:"ttl"` - PoW float64 `json:"pow"` - Hash string `json:"hash"` +//go:generate gencodec -type Message -field-override messageOverride -out gen_message_json.go + +// Message is the RPC representation of a whisper message. +type Message struct { + Sig []byte `json:"sig,omitempty"` + TTL uint32 `json:"ttl"` + Timestamp uint32 `json:"timestamp"` + Topic TopicType `json:"topic"` + Payload []byte `json:"payload"` + Padding []byte `json:"padding"` + PoW float64 `json:"pow"` + Hash []byte `json:"hash"` + Dst []byte `json:"recipientPublicKey,omitempty"` } -// NewWhisperMessage converts an internal message into an API version. -func NewWhisperMessage(message *ReceivedMessage) *WhisperMessage { - msg := WhisperMessage{ - Payload: common.ToHex(message.Payload), - Padding: common.ToHex(message.Padding), +type messageOverride struct { + Sig hexutil.Bytes + Payload hexutil.Bytes + Padding hexutil.Bytes + Hash hexutil.Bytes + Dst hexutil.Bytes +} + +// ToWhisperMessage converts an internal message into an API version. +func ToWhisperMessage(message *ReceivedMessage) *Message { + msg := Message{ + Payload: message.Payload, + Padding: message.Padding, Timestamp: message.Sent, TTL: message.TTL, PoW: message.PoW, - Hash: common.ToHex(message.EnvelopeHash.Bytes()), + Hash: message.EnvelopeHash.Bytes(), + Topic: message.Topic, } - if len(message.Topic) == TopicLength { - msg.Topic = common.ToHex(message.Topic[:]) - } if message.Dst != nil { b := crypto.FromECDSAPub(message.Dst) if b != nil { - msg.Dst = common.ToHex(b) + msg.Dst = b } } + if isMessageSigned(message.Raw[0]) { b := crypto.FromECDSAPub(message.SigToPubKey()) if b != nil { - msg.Src = common.ToHex(b) + msg.Sig = b } } + return &msg } + +// toMessage converts a set of messages to its RPC representation. +func toMessage(messages []*ReceivedMessage) []*Message { + msgs := make([]*Message, len(messages)) + for i, msg := range messages { + msgs[i] = ToWhisperMessage(msg) + } + return msgs +} + +// GetFilterMessages returns the messages that match the filter criteria and +// are received between the last poll and now. +func (api *PublicWhisperAPI) GetFilterMessages(id string) ([]*Message, error) { + api.mu.Lock() + f := api.w.GetFilter(id) + if f == nil { + api.mu.Unlock() + return nil, fmt.Errorf("filter not found") + } + api.lastUsed[id] = time.Now() + api.mu.Unlock() + + receivedMessages := f.Retrieve() + messages := make([]*Message, 0, len(receivedMessages)) + for _, msg := range receivedMessages { + messages = append(messages, ToWhisperMessage(msg)) + } + + return messages, nil +} + +// DeleteMessageFilter deletes a filter. +func (api *PublicWhisperAPI) DeleteMessageFilter(id string) (bool, error) { + api.mu.Lock() + defer api.mu.Unlock() + + delete(api.lastUsed, id) + return true, api.w.Unsubscribe(id) +} + +// NewMessageFilter creates a new filter that can be used to poll for +// (new) messages that satisfy the given criteria. +func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) { + var ( + src *ecdsa.PublicKey + keySym []byte + keyAsym *ecdsa.PrivateKey + topics [][]byte + + symKeyGiven = len(req.SymKeyID) > 0 + asymKeyGiven = len(req.PrivateKeyID) > 0 + + err error + ) + + // user must specify either a symmetric or a asymmetric key + if (symKeyGiven && asymKeyGiven) || (!symKeyGiven && !asymKeyGiven) { + return "", SymAsymErr + } + + if len(req.Sig) > 0 { + src = crypto.ToECDSAPub(req.Sig) + if !ValidatePublicKey(src) { + return "", InvalidSigningPubKey + } + } + + if symKeyGiven { + if keySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { + return "", err + } + if !validateSymmetricKey(keySym) { + return "", InvalidSymmetricKeyErr + } + } + + if asymKeyGiven { + if keyAsym, err = api.w.GetPrivateKey(req.PrivateKeyID); err != nil { + return "", err + } + } + + if len(req.Topics) > 0 { + topics = make([][]byte, 1) + for _, topic := range req.Topics { + topics = append(topics, topic[:]) + } + } + + f := &Filter{ + Src: src, + KeySym: keySym, + KeyAsym: keyAsym, + PoW: req.MinPow, + AllowP2P: req.AllowP2P, + Topics: topics, + Messages: make(map[common.Hash]*ReceivedMessage), + } + + id, err := api.w.Subscribe(f) + if err != nil { + return "", err + } + + api.mu.Lock() + api.lastUsed[id] = time.Now() + api.mu.Unlock() + + return id, nil +} |