diff options
Diffstat (limited to 'swarm/pss/client/client.go')
-rw-r--r-- | swarm/pss/client/client.go | 352 |
1 files changed, 0 insertions, 352 deletions
diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go deleted file mode 100644 index 5ee387aa7..000000000 --- a/swarm/pss/client/client.go +++ /dev/null @@ -1,352 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// +build !noclient,!noprotocol - -package client - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/protocols" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/pss" -) - -const ( - handshakeRetryTimeout = 1000 - handshakeRetryCount = 3 -) - -// The pss client provides devp2p emulation over pss RPC API, -// giving access to pss methods from a different process -type Client struct { - BaseAddrHex string - - // peers - peerPool map[pss.Topic]map[string]*pssRPCRW - protos map[pss.Topic]*p2p.Protocol - - // rpc connections - rpc *rpc.Client - subs []*rpc.ClientSubscription - - // channels - topicsC chan []byte - quitC chan struct{} - - poolMu sync.Mutex -} - -// implements p2p.MsgReadWriter -type pssRPCRW struct { - *Client - topic string - msgC chan []byte - addr pss.PssAddress - pubKeyId string - lastSeen time.Time - closed bool -} - -func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) { - topic := topicobj.String() - err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:])) - if err != nil { - return nil, fmt.Errorf("setpeer %s %s: %v", topic, pubkeyid, err) - } - return &pssRPCRW{ - Client: c, - topic: topic, - msgC: make(chan []byte), - addr: addr, - pubKeyId: pubkeyid, - }, nil -} - -func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) { - msg := <-rw.msgC - log.Trace("pssrpcrw read", "msg", msg) - pmsg, err := pss.ToP2pMsg(msg) - if err != nil { - return p2p.Msg{}, err - } - - return pmsg, nil -} - -// If only one message slot left -// then new is requested through handshake -// if buffer is empty, handshake request blocks until return -// after which pointer is changed to first new key in buffer -// will fail if: -// - any api calls fail -// - handshake retries are exhausted without reply, -// - send fails -func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error { - log.Trace("got writemsg pssclient", "msg", msg) - if rw.closed { - return fmt.Errorf("connection closed") - } - rlpdata := make([]byte, msg.Size) - msg.Payload.Read(rlpdata) - pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{ - Code: msg.Code, - Size: msg.Size, - Payload: rlpdata, - }) - if err != nil { - return err - } - - // Get the keys we have - var symkeyids []string - err = rw.Client.rpc.Call(&symkeyids, "pss_getHandshakeKeys", rw.pubKeyId, rw.topic, false, true) - if err != nil { - return err - } - - // Check the capacity of the first key - var symkeycap uint16 - if len(symkeyids) > 0 { - err = rw.Client.rpc.Call(&symkeycap, "pss_getHandshakeKeyCapacity", symkeyids[0]) - if err != nil { - return err - } - } - - err = rw.Client.rpc.Call(nil, "pss_sendSym", symkeyids[0], rw.topic, hexutil.Encode(pmsg)) - if err != nil { - return err - } - - // If this is the last message it is valid for, initiate new handshake - if symkeycap == 1 { - var retries int - var sync bool - // if it's the only remaining key, make sure we don't continue until we have new ones for further writes - if len(symkeyids) == 1 { - sync = true - } - // initiate handshake - _, err := rw.handshake(retries, sync, false) - if err != nil { - log.Warn("failing", "err", err) - return err - } - } - return nil -} - -// retry and synchronicity wrapper for handshake api call -// returns first new symkeyid upon successful execution -func (rw *pssRPCRW) handshake(retries int, sync bool, flush bool) (string, error) { - - var symkeyids []string - var i int - // request new keys - // if the key buffer was depleted, make this as a blocking call and try several times before giving up - for i = 0; i < 1+retries; i++ { - log.Debug("handshake attempt pssrpcrw", "pubkeyid", rw.pubKeyId, "topic", rw.topic, "sync", sync) - err := rw.Client.rpc.Call(&symkeyids, "pss_handshake", rw.pubKeyId, rw.topic, sync, flush) - if err == nil { - var keyid string - if sync { - keyid = symkeyids[0] - } - return keyid, nil - } - if i-1+retries > 1 { - time.Sleep(time.Millisecond * handshakeRetryTimeout) - } - } - - return "", fmt.Errorf("handshake failed after %d attempts", i) -} - -// Custom constructor -// -// Provides direct access to the rpc object -func NewClient(rpcurl string) (*Client, error) { - rpcclient, err := rpc.Dial(rpcurl) - if err != nil { - return nil, err - } - - client, err := NewClientWithRPC(rpcclient) - if err != nil { - return nil, err - } - return client, nil -} - -// Main constructor -// -// The 'rpcclient' parameter allows passing a in-memory rpc client to act as the remote websocket RPC. -func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) { - client := newClient() - client.rpc = rpcclient - err := client.rpc.Call(&client.BaseAddrHex, "pss_baseAddr") - if err != nil { - return nil, fmt.Errorf("cannot get pss node baseaddress: %v", err) - } - return client, nil -} - -func newClient() (client *Client) { - client = &Client{ - quitC: make(chan struct{}), - peerPool: make(map[pss.Topic]map[string]*pssRPCRW), - protos: make(map[pss.Topic]*p2p.Protocol), - } - return -} - -// Mounts a new devp2p protcool on the pss connection -// -// the protocol is aliased as a "pss topic" -// uses normal devp2p send and incoming message handler routines from the p2p/protocols package -// -// when an incoming message is received from a peer that is not yet known to the client, -// this peer object is instantiated, and the protocol is run on it. -func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error { - topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version))) - topichex := topicobj.String() - msgC := make(chan pss.APIMsg) - c.peerPool[topicobj] = make(map[string]*pssRPCRW) - sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false, false) - if err != nil { - return fmt.Errorf("pss event subscription failed: %v", err) - } - c.subs = append(c.subs, sub) - err = c.rpc.Call(nil, "pss_addHandshake", topichex) - if err != nil { - return fmt.Errorf("pss handshake activation failed: %v", err) - } - - // dispatch incoming messages - go func() { - for { - select { - case msg := <-msgC: - // we only allow sym msgs here - if msg.Asymmetric { - continue - } - // we get passed the symkeyid - // need the symkey itself to resolve to peer's pubkey - var pubkeyid string - err = c.rpc.Call(&pubkeyid, "pss_getHandshakePublicKey", msg.Key) - if err != nil || pubkeyid == "" { - log.Trace("proto err or no pubkey", "err", err, "symkeyid", msg.Key) - continue - } - // if we don't have the peer on this protocol already, create it - // this is more or less the same as AddPssPeer, less the handshake initiation - if c.peerPool[topicobj][pubkeyid] == nil { - var addrhex string - err := c.rpc.Call(&addrhex, "pss_getAddress", topichex, false, msg.Key) - if err != nil { - log.Trace(err.Error()) - continue - } - addrbytes, err := hexutil.Decode(addrhex) - if err != nil { - log.Trace(err.Error()) - break - } - addr := pss.PssAddress(addrbytes) - rw, err := c.newpssRPCRW(pubkeyid, addr, topicobj) - if err != nil { - break - } - c.peerPool[topicobj][pubkeyid] = rw - p := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%v", addr), []p2p.Cap{}) - go proto.Run(p, c.peerPool[topicobj][pubkeyid]) - } - go func() { - c.peerPool[topicobj][pubkeyid].msgC <- msg.Msg - }() - case <-c.quitC: - return - } - } - }() - - c.protos[topicobj] = proto - return nil -} - -// Always call this to ensure that we exit cleanly -func (c *Client) Close() error { - for _, s := range c.subs { - s.Unsubscribe() - } - return nil -} - -// Add a pss peer (public key) and run the protocol on it -// -// client.RunProtocol with matching topic must have been -// run prior to adding the peer, or this method will -// return an error. -// -// The key must exist in the key store of the pss node -// before the peer is added. The method will return an error -// if it is not. -func (c *Client) AddPssPeer(pubkeyid string, addr []byte, spec *protocols.Spec) error { - topic := pss.ProtocolTopic(spec) - if c.peerPool[topic] == nil { - return errors.New("addpeer on unset topic") - } - if c.peerPool[topic][pubkeyid] == nil { - rw, err := c.newpssRPCRW(pubkeyid, addr, topic) - if err != nil { - return err - } - _, err = rw.handshake(handshakeRetryCount, true, true) - if err != nil { - return err - } - c.poolMu.Lock() - c.peerPool[topic][pubkeyid] = rw - c.poolMu.Unlock() - p := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%v", addr), []p2p.Cap{}) - go c.protos[topic].Run(p, c.peerPool[topic][pubkeyid]) - } - return nil -} - -// Remove a pss peer -// -// TODO: underlying cleanup -func (c *Client) RemovePssPeer(pubkeyid string, spec *protocols.Spec) { - log.Debug("closing pss client peer", "pubkey", pubkeyid, "protoname", spec.Name, "protoversion", spec.Version) - c.poolMu.Lock() - defer c.poolMu.Unlock() - topic := pss.ProtocolTopic(spec) - c.peerPool[topic][pubkeyid].closed = true - delete(c.peerPool[topic], pubkeyid) -} |