aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/pss/client
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/pss/client
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloaddexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/pss/client')
-rw-r--r--swarm/pss/client/client.go354
-rw-r--r--swarm/pss/client/client_test.go302
-rw-r--r--swarm/pss/client/doc.go96
3 files changed, 752 insertions, 0 deletions
diff --git a/swarm/pss/client/client.go b/swarm/pss/client/client.go
new file mode 100644
index 000000000..532a22384
--- /dev/null
+++ b/swarm/pss/client/client.go
@@ -0,0 +1,354 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build !noclient,!noprotocol
+
+package client
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/pss"
+)
+
+const (
+ handshakeRetryTimeout = 1000
+ handshakeRetryCount = 3
+)
+
+// The pss client provides devp2p emulation over pss RPC API,
+// giving access to pss methods from a different process
+type Client struct {
+ BaseAddrHex string
+
+ // peers
+ peerPool map[pss.Topic]map[string]*pssRPCRW
+ protos map[pss.Topic]*p2p.Protocol
+
+ // rpc connections
+ rpc *rpc.Client
+ subs []*rpc.ClientSubscription
+
+ // channels
+ topicsC chan []byte
+ quitC chan struct{}
+
+ poolMu sync.Mutex
+}
+
+// implements p2p.MsgReadWriter
+type pssRPCRW struct {
+ *Client
+ topic string
+ msgC chan []byte
+ addr pss.PssAddress
+ pubKeyId string
+ lastSeen time.Time
+ closed bool
+}
+
+func (c *Client) newpssRPCRW(pubkeyid string, addr pss.PssAddress, topicobj pss.Topic) (*pssRPCRW, error) {
+ topic := topicobj.String()
+ err := c.rpc.Call(nil, "pss_setPeerPublicKey", pubkeyid, topic, hexutil.Encode(addr[:]))
+ if err != nil {
+ return nil, fmt.Errorf("setpeer %s %s: %v", topic, pubkeyid, err)
+ }
+ return &pssRPCRW{
+ Client: c,
+ topic: topic,
+ msgC: make(chan []byte),
+ addr: addr,
+ pubKeyId: pubkeyid,
+ }, nil
+}
+
+func (rw *pssRPCRW) ReadMsg() (p2p.Msg, error) {
+ msg := <-rw.msgC
+ log.Trace("pssrpcrw read", "msg", msg)
+ pmsg, err := pss.ToP2pMsg(msg)
+ if err != nil {
+ return p2p.Msg{}, err
+ }
+
+ return pmsg, nil
+}
+
+// If only one message slot left
+// then new is requested through handshake
+// if buffer is empty, handshake request blocks until return
+// after which pointer is changed to first new key in buffer
+// will fail if:
+// - any api calls fail
+// - handshake retries are exhausted without reply,
+// - send fails
+func (rw *pssRPCRW) WriteMsg(msg p2p.Msg) error {
+ log.Trace("got writemsg pssclient", "msg", msg)
+ if rw.closed {
+ return fmt.Errorf("connection closed")
+ }
+ rlpdata := make([]byte, msg.Size)
+ msg.Payload.Read(rlpdata)
+ pmsg, err := rlp.EncodeToBytes(pss.ProtocolMsg{
+ Code: msg.Code,
+ Size: msg.Size,
+ Payload: rlpdata,
+ })
+ if err != nil {
+ return err
+ }
+
+ // Get the keys we have
+ var symkeyids []string
+ err = rw.Client.rpc.Call(&symkeyids, "pss_getHandshakeKeys", rw.pubKeyId, rw.topic, false, true)
+ if err != nil {
+ return err
+ }
+
+ // Check the capacity of the first key
+ var symkeycap uint16
+ if len(symkeyids) > 0 {
+ err = rw.Client.rpc.Call(&symkeycap, "pss_getHandshakeKeyCapacity", symkeyids[0])
+ if err != nil {
+ return err
+ }
+ }
+
+ err = rw.Client.rpc.Call(nil, "pss_sendSym", symkeyids[0], rw.topic, hexutil.Encode(pmsg))
+ if err != nil {
+ return err
+ }
+
+ // If this is the last message it is valid for, initiate new handshake
+ if symkeycap == 1 {
+ var retries int
+ var sync bool
+ // if it's the only remaining key, make sure we don't continue until we have new ones for further writes
+ if len(symkeyids) == 1 {
+ sync = true
+ }
+ // initiate handshake
+ _, err := rw.handshake(retries, sync, false)
+ if err != nil {
+ log.Warn("failing", "err", err)
+ return err
+ }
+ }
+ return nil
+}
+
+// retry and synchronicity wrapper for handshake api call
+// returns first new symkeyid upon successful execution
+func (rw *pssRPCRW) handshake(retries int, sync bool, flush bool) (string, error) {
+
+ var symkeyids []string
+ var i int
+ // request new keys
+ // if the key buffer was depleted, make this as a blocking call and try several times before giving up
+ for i = 0; i < 1+retries; i++ {
+ log.Debug("handshake attempt pssrpcrw", "pubkeyid", rw.pubKeyId, "topic", rw.topic, "sync", sync)
+ err := rw.Client.rpc.Call(&symkeyids, "pss_handshake", rw.pubKeyId, rw.topic, sync, flush)
+ if err == nil {
+ var keyid string
+ if sync {
+ keyid = symkeyids[0]
+ }
+ return keyid, nil
+ }
+ if i-1+retries > 1 {
+ time.Sleep(time.Millisecond * handshakeRetryTimeout)
+ }
+ }
+
+ return "", fmt.Errorf("handshake failed after %d attempts", i)
+}
+
+// Custom constructor
+//
+// Provides direct access to the rpc object
+func NewClient(rpcurl string) (*Client, error) {
+ rpcclient, err := rpc.Dial(rpcurl)
+ if err != nil {
+ return nil, err
+ }
+
+ client, err := NewClientWithRPC(rpcclient)
+ if err != nil {
+ return nil, err
+ }
+ return client, nil
+}
+
+// Main constructor
+//
+// The 'rpcclient' parameter allows passing a in-memory rpc client to act as the remote websocket RPC.
+func NewClientWithRPC(rpcclient *rpc.Client) (*Client, error) {
+ client := newClient()
+ client.rpc = rpcclient
+ err := client.rpc.Call(&client.BaseAddrHex, "pss_baseAddr")
+ if err != nil {
+ return nil, fmt.Errorf("cannot get pss node baseaddress: %v", err)
+ }
+ return client, nil
+}
+
+func newClient() (client *Client) {
+ client = &Client{
+ quitC: make(chan struct{}),
+ peerPool: make(map[pss.Topic]map[string]*pssRPCRW),
+ protos: make(map[pss.Topic]*p2p.Protocol),
+ }
+ return
+}
+
+// Mounts a new devp2p protcool on the pss connection
+//
+// the protocol is aliased as a "pss topic"
+// uses normal devp2p send and incoming message handler routines from the p2p/protocols package
+//
+// when an incoming message is received from a peer that is not yet known to the client,
+// this peer object is instantiated, and the protocol is run on it.
+func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
+ topicobj := pss.BytesToTopic([]byte(fmt.Sprintf("%s:%d", proto.Name, proto.Version)))
+ topichex := topicobj.String()
+ msgC := make(chan pss.APIMsg)
+ c.peerPool[topicobj] = make(map[string]*pssRPCRW)
+ sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex)
+ if err != nil {
+ return fmt.Errorf("pss event subscription failed: %v", err)
+ }
+ c.subs = append(c.subs, sub)
+ err = c.rpc.Call(nil, "pss_addHandshake", topichex)
+ if err != nil {
+ return fmt.Errorf("pss handshake activation failed: %v", err)
+ }
+
+ // dispatch incoming messages
+ go func() {
+ for {
+ select {
+ case msg := <-msgC:
+ // we only allow sym msgs here
+ if msg.Asymmetric {
+ continue
+ }
+ // we get passed the symkeyid
+ // need the symkey itself to resolve to peer's pubkey
+ var pubkeyid string
+ err = c.rpc.Call(&pubkeyid, "pss_getHandshakePublicKey", msg.Key)
+ if err != nil || pubkeyid == "" {
+ log.Trace("proto err or no pubkey", "err", err, "symkeyid", msg.Key)
+ continue
+ }
+ // if we don't have the peer on this protocol already, create it
+ // this is more or less the same as AddPssPeer, less the handshake initiation
+ if c.peerPool[topicobj][pubkeyid] == nil {
+ var addrhex string
+ err := c.rpc.Call(&addrhex, "pss_getAddress", topichex, false, msg.Key)
+ if err != nil {
+ log.Trace(err.Error())
+ continue
+ }
+ addrbytes, err := hexutil.Decode(addrhex)
+ if err != nil {
+ log.Trace(err.Error())
+ break
+ }
+ addr := pss.PssAddress(addrbytes)
+ rw, err := c.newpssRPCRW(pubkeyid, addr, topicobj)
+ if err != nil {
+ break
+ }
+ c.peerPool[topicobj][pubkeyid] = rw
+ nid, _ := discover.HexID("0x00")
+ p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{})
+ go proto.Run(p, c.peerPool[topicobj][pubkeyid])
+ }
+ go func() {
+ c.peerPool[topicobj][pubkeyid].msgC <- msg.Msg
+ }()
+ case <-c.quitC:
+ return
+ }
+ }
+ }()
+
+ c.protos[topicobj] = proto
+ return nil
+}
+
+// Always call this to ensure that we exit cleanly
+func (c *Client) Close() error {
+ for _, s := range c.subs {
+ s.Unsubscribe()
+ }
+ return nil
+}
+
+// Add a pss peer (public key) and run the protocol on it
+//
+// client.RunProtocol with matching topic must have been
+// run prior to adding the peer, or this method will
+// return an error.
+//
+// The key must exist in the key store of the pss node
+// before the peer is added. The method will return an error
+// if it is not.
+func (c *Client) AddPssPeer(pubkeyid string, addr []byte, spec *protocols.Spec) error {
+ topic := pss.ProtocolTopic(spec)
+ if c.peerPool[topic] == nil {
+ return errors.New("addpeer on unset topic")
+ }
+ if c.peerPool[topic][pubkeyid] == nil {
+ rw, err := c.newpssRPCRW(pubkeyid, addr, topic)
+ if err != nil {
+ return err
+ }
+ _, err = rw.handshake(handshakeRetryCount, true, true)
+ if err != nil {
+ return err
+ }
+ c.poolMu.Lock()
+ c.peerPool[topic][pubkeyid] = rw
+ c.poolMu.Unlock()
+ nid, _ := discover.HexID("0x00")
+ p := p2p.NewPeer(nid, fmt.Sprintf("%v", addr), []p2p.Cap{})
+ go c.protos[topic].Run(p, c.peerPool[topic][pubkeyid])
+ }
+ return nil
+}
+
+// Remove a pss peer
+//
+// TODO: underlying cleanup
+func (c *Client) RemovePssPeer(pubkeyid string, spec *protocols.Spec) {
+ log.Debug("closing pss client peer", "pubkey", pubkeyid, "protoname", spec.Name, "protoversion", spec.Version)
+ c.poolMu.Lock()
+ defer c.poolMu.Unlock()
+ topic := pss.ProtocolTopic(spec)
+ c.peerPool[topic][pubkeyid].closed = true
+ delete(c.peerPool[topic], pubkeyid)
+}
diff --git a/swarm/pss/client/client_test.go b/swarm/pss/client/client_test.go
new file mode 100644
index 000000000..f36069877
--- /dev/null
+++ b/swarm/pss/client/client_test.go
@@ -0,0 +1,302 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package client
+
+import (
+ "bytes"
+ "context"
+ "flag"
+ "fmt"
+ "math/rand"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/simulations"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/pss"
+ "github.com/ethereum/go-ethereum/swarm/state"
+ whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
+)
+
+type protoCtrl struct {
+ C chan bool
+ protocol *pss.Protocol
+ run func(*p2p.Peer, p2p.MsgReadWriter) error
+}
+
+var (
+ debugdebugflag = flag.Bool("vv", false, "veryverbose")
+ debugflag = flag.Bool("v", false, "verbose")
+ w *whisper.Whisper
+ wapi *whisper.PublicWhisperAPI
+ // custom logging
+ psslogmain log.Logger
+ pssprotocols map[string]*protoCtrl
+ sendLimit = uint16(256)
+)
+
+var services = newServices()
+
+func init() {
+ flag.Parse()
+ rand.Seed(time.Now().Unix())
+
+ adapters.RegisterServices(services)
+
+ loglevel := log.LvlInfo
+ if *debugflag {
+ loglevel = log.LvlDebug
+ } else if *debugdebugflag {
+ loglevel = log.LvlTrace
+ }
+
+ psslogmain = log.New("psslog", "*")
+ hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
+ hf := log.LvlFilterHandler(loglevel, hs)
+ h := log.CallerFileHandler(hf)
+ log.Root().SetHandler(h)
+
+ w = whisper.New(&whisper.DefaultConfig)
+ wapi = whisper.NewPublicWhisperAPI(w)
+
+ pssprotocols = make(map[string]*protoCtrl)
+}
+
+// ping pong exchange across one expired symkey
+func TestClientHandshake(t *testing.T) {
+ sendLimit = 3
+
+ clients, err := setupNetwork(2)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ lpsc, err := NewClientWithRPC(clients[0])
+ if err != nil {
+ t.Fatal(err)
+ }
+ rpsc, err := NewClientWithRPC(clients[1])
+ if err != nil {
+ t.Fatal(err)
+ }
+ lpssping := &pss.Ping{
+ OutC: make(chan bool),
+ InC: make(chan bool),
+ Pong: false,
+ }
+ rpssping := &pss.Ping{
+ OutC: make(chan bool),
+ InC: make(chan bool),
+ Pong: false,
+ }
+ lproto := pss.NewPingProtocol(lpssping)
+ rproto := pss.NewPingProtocol(rpssping)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ err = lpsc.RunProtocol(ctx, lproto)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = rpsc.RunProtocol(ctx, rproto)
+ if err != nil {
+ t.Fatal(err)
+ }
+ topic := pss.PingTopic.String()
+
+ var loaddr string
+ err = clients[0].Call(&loaddr, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 1 baseaddr fail: %v", err)
+ }
+ var roaddr string
+ err = clients[1].Call(&roaddr, "pss_baseAddr")
+ if err != nil {
+ t.Fatalf("rpc get node 2 baseaddr fail: %v", err)
+ }
+
+ var lpubkey string
+ err = clients[0].Call(&lpubkey, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get node 1 pubkey fail: %v", err)
+ }
+ var rpubkey string
+ err = clients[1].Call(&rpubkey, "pss_getPublicKey")
+ if err != nil {
+ t.Fatalf("rpc get node 2 pubkey fail: %v", err)
+ }
+
+ err = clients[0].Call(nil, "pss_setPeerPublicKey", rpubkey, topic, roaddr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = clients[1].Call(nil, "pss_setPeerPublicKey", lpubkey, topic, loaddr)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ time.Sleep(time.Second)
+
+ roaddrbytes, err := hexutil.Decode(roaddr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = lpsc.AddPssPeer(rpubkey, roaddrbytes, pss.PingProtocol)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ time.Sleep(time.Second)
+
+ for i := uint16(0); i <= sendLimit; i++ {
+ lpssping.OutC <- false
+ got := <-rpssping.InC
+ log.Warn("ok", "idx", i, "got", got)
+ time.Sleep(time.Second)
+ }
+
+ rw := lpsc.peerPool[pss.PingTopic][rpubkey]
+ lpsc.RemovePssPeer(rpubkey, pss.PingProtocol)
+ if err := rw.WriteMsg(p2p.Msg{
+ Size: 3,
+ Payload: bytes.NewReader([]byte("foo")),
+ }); err == nil {
+ t.Fatalf("expected error on write")
+ }
+}
+
+func setupNetwork(numnodes int) (clients []*rpc.Client, err error) {
+ nodes := make([]*simulations.Node, numnodes)
+ clients = make([]*rpc.Client, numnodes)
+ if numnodes < 2 {
+ return nil, fmt.Errorf("Minimum two nodes in network")
+ }
+ adapter := adapters.NewSimAdapter(services)
+ net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
+ ID: "0",
+ DefaultService: "bzz",
+ })
+ for i := 0; i < numnodes; i++ {
+ nodeconf := adapters.RandomNodeConfig()
+ nodeconf.Services = []string{"bzz", "pss"}
+ nodes[i], err = net.NewNodeWithConfig(nodeconf)
+ if err != nil {
+ return nil, fmt.Errorf("error creating node 1: %v", err)
+ }
+ err = net.Start(nodes[i].ID())
+ if err != nil {
+ return nil, fmt.Errorf("error starting node 1: %v", err)
+ }
+ if i > 0 {
+ err = net.Connect(nodes[i].ID(), nodes[i-1].ID())
+ if err != nil {
+ return nil, fmt.Errorf("error connecting nodes: %v", err)
+ }
+ }
+ clients[i], err = nodes[i].Client()
+ if err != nil {
+ return nil, fmt.Errorf("create node 1 rpc client fail: %v", err)
+ }
+ }
+ if numnodes > 2 {
+ err = net.Connect(nodes[0].ID(), nodes[len(nodes)-1].ID())
+ if err != nil {
+ return nil, fmt.Errorf("error connecting first and last nodes")
+ }
+ }
+ return clients, nil
+}
+
+func newServices() adapters.Services {
+ stateStore := state.NewInmemoryStore()
+ kademlias := make(map[discover.NodeID]*network.Kademlia)
+ kademlia := func(id discover.NodeID) *network.Kademlia {
+ if k, ok := kademlias[id]; ok {
+ return k
+ }
+ addr := network.NewAddrFromNodeID(id)
+ params := network.NewKadParams()
+ params.MinProxBinSize = 2
+ params.MaxBinSize = 3
+ params.MinBinSize = 1
+ params.MaxRetries = 1000
+ params.RetryExponent = 2
+ params.RetryInterval = 1000000
+ kademlias[id] = network.NewKademlia(addr.Over(), params)
+ return kademlias[id]
+ }
+ return adapters.Services{
+ "pss": func(ctx *adapters.ServiceContext) (node.Service, error) {
+ ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ keys, err := wapi.NewKeyPair(ctxlocal)
+ privkey, err := w.GetPrivateKey(keys)
+ psparams := pss.NewPssParams().WithPrivateKey(privkey)
+ pskad := kademlia(ctx.Config.ID)
+ ps, err := pss.NewPss(pskad, psparams)
+ if err != nil {
+ return nil, err
+ }
+ pshparams := pss.NewHandshakeParams()
+ pshparams.SymKeySendLimit = sendLimit
+ err = pss.SetHandshakeController(ps, pshparams)
+ if err != nil {
+ return nil, fmt.Errorf("handshake controller fail: %v", err)
+ }
+ return ps, nil
+ },
+ "bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
+ addr := network.NewAddrFromNodeID(ctx.Config.ID)
+ hp := network.NewHiveParams()
+ hp.Discovery = false
+ config := &network.BzzConfig{
+ OverlayAddr: addr.Over(),
+ UnderlayAddr: addr.Under(),
+ HiveParams: hp,
+ }
+ return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
+ },
+ }
+}
+
+// copied from swarm/network/protocol_test_go
+type testStore struct {
+ sync.Mutex
+
+ values map[string][]byte
+}
+
+func newTestStore() *testStore {
+ return &testStore{values: make(map[string][]byte)}
+}
+
+func (t *testStore) Load(key string) ([]byte, error) {
+ return nil, nil
+}
+
+func (t *testStore) Save(key string, v []byte) error {
+ return nil
+}
diff --git a/swarm/pss/client/doc.go b/swarm/pss/client/doc.go
new file mode 100644
index 000000000..080af45a9
--- /dev/null
+++ b/swarm/pss/client/doc.go
@@ -0,0 +1,96 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// simple abstraction for implementing pss functionality
+//
+// the pss client library aims to simplify usage of the p2p.protocols package over pss
+//
+// IO is performed using the ordinary p2p.MsgReadWriter interface, which transparently communicates with a pss node via RPC using websockets as transport layer, using methods in the PssAPI class in the swarm/pss package
+//
+//
+// Minimal-ish usage example (requires a running pss node with websocket RPC):
+//
+//
+// import (
+// "context"
+// "fmt"
+// "os"
+// pss "github.com/ethereum/go-ethereum/swarm/pss/client"
+// "github.com/ethereum/go-ethereum/p2p/protocols"
+// "github.com/ethereum/go-ethereum/p2p"
+// "github.com/ethereum/go-ethereum/swarm/pot"
+// "github.com/ethereum/go-ethereum/swarm/log"
+// )
+//
+// type FooMsg struct {
+// Bar int
+// }
+//
+//
+// func fooHandler (msg interface{}) error {
+// foomsg, ok := msg.(*FooMsg)
+// if ok {
+// log.Debug("Yay, just got a message", "msg", foomsg)
+// }
+// return errors.New(fmt.Sprintf("Unknown message"))
+// }
+//
+// spec := &protocols.Spec{
+// Name: "foo",
+// Version: 1,
+// MaxMsgSize: 1024,
+// Messages: []interface{}{
+// FooMsg{},
+// },
+// }
+//
+// proto := &p2p.Protocol{
+// Name: spec.Name,
+// Version: spec.Version,
+// Length: uint64(len(spec.Messages)),
+// Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+// pp := protocols.NewPeer(p, rw, spec)
+// return pp.Run(fooHandler)
+// },
+// }
+//
+// func implementation() {
+// cfg := pss.NewClientConfig()
+// psc := pss.NewClient(context.Background(), nil, cfg)
+// err := psc.Start()
+// if err != nil {
+// log.Crit("can't start pss client")
+// os.Exit(1)
+// }
+//
+// log.Debug("connected to pss node", "bzz addr", psc.BaseAddr)
+//
+// err = psc.RunProtocol(proto)
+// if err != nil {
+// log.Crit("can't start protocol on pss websocket")
+// os.Exit(1)
+// }
+//
+// addr := pot.RandomAddress() // should be a real address, of course
+// psc.AddPssPeer(addr, spec)
+//
+// // use the protocol for something
+//
+// psc.Stop()
+// }
+//
+// BUG(test): TestIncoming test times out due to deadlock issues in the swarm hive
+package client