aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/pss/pss_test.go
diff options
context:
space:
mode:
authorlash <nolash@users.noreply.github.com>2018-11-26 20:52:04 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2018-11-26 20:52:04 +0800
commit197d609b9a3a9b5436a9902dcc8db38bb3e7543b (patch)
tree5a5e6650aed71fbff63cffd62f84e2bb2de0e8cc /swarm/pss/pss_test.go
parentca228569e4aedd5ebbe853ec61761561d86579a6 (diff)
downloadgo-tangerine-197d609b9a3a9b5436a9902dcc8db38bb3e7543b.tar.gz
go-tangerine-197d609b9a3a9b5436a9902dcc8db38bb3e7543b.tar.zst
go-tangerine-197d609b9a3a9b5436a9902dcc8db38bb3e7543b.zip
swarm/pss: Message handler refactor (#18169)
Diffstat (limited to 'swarm/pss/pss_test.go')
-rw-r--r--swarm/pss/pss_test.go493
1 files changed, 446 insertions, 47 deletions
diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go
index 66a90be62..32404aaaf 100644
--- a/swarm/pss/pss_test.go
+++ b/swarm/pss/pss_test.go
@@ -48,20 +48,23 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network"
+ "github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/state"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)
var (
- initOnce = sync.Once{}
- debugdebugflag = flag.Bool("vv", false, "veryverbose")
- debugflag = flag.Bool("v", false, "verbose")
- longrunning = flag.Bool("longrunning", false, "do run long-running tests")
- w *whisper.Whisper
- wapi *whisper.PublicWhisperAPI
- psslogmain log.Logger
- pssprotocols map[string]*protoCtrl
- useHandshake bool
+ initOnce = sync.Once{}
+ loglevel = flag.Int("loglevel", 2, "logging verbosity")
+ longrunning = flag.Bool("longrunning", false, "do run long-running tests")
+ w *whisper.Whisper
+ wapi *whisper.PublicWhisperAPI
+ psslogmain log.Logger
+ pssprotocols map[string]*protoCtrl
+ useHandshake bool
+ noopHandlerFunc = func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
+ return nil
+ }
)
func init() {
@@ -75,16 +78,9 @@ func init() {
func initTest() {
initOnce.Do(
func() {
- loglevel := log.LvlInfo
- if *debugflag {
- loglevel = log.LvlDebug
- } else if *debugdebugflag {
- loglevel = log.LvlTrace
- }
-
psslogmain = log.New("psslog", "*")
hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
- hf := log.LvlFilterHandler(loglevel, hs)
+ hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs)
h := log.CallerFileHandler(hf)
log.Root().SetHandler(h)
@@ -280,15 +276,14 @@ func TestAddressMatch(t *testing.T) {
}
pssmsg := &PssMsg{
- To: remoteaddr,
- Payload: &whisper.Envelope{},
+ To: remoteaddr,
}
// differ from first byte
if ps.isSelfRecipient(pssmsg) {
t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
}
- if ps.isSelfPossibleRecipient(pssmsg) {
+ if ps.isSelfPossibleRecipient(pssmsg, false) {
t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8])
}
@@ -297,7 +292,7 @@ func TestAddressMatch(t *testing.T) {
if ps.isSelfRecipient(pssmsg) {
t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr)
}
- if !ps.isSelfPossibleRecipient(pssmsg) {
+ if !ps.isSelfPossibleRecipient(pssmsg, false) {
t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
}
@@ -306,13 +301,342 @@ func TestAddressMatch(t *testing.T) {
if !ps.isSelfRecipient(pssmsg) {
t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr)
}
- if !ps.isSelfPossibleRecipient(pssmsg) {
+ if !ps.isSelfPossibleRecipient(pssmsg, false) {
t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8])
}
+
}
-//
-func TestHandlerConditions(t *testing.T) {
+// test that message is handled by sender if a prox handler exists and sender is in prox of message
+func TestProxShortCircuit(t *testing.T) {
+
+ // sender node address
+ localAddr := network.RandomAddr().Over()
+ localPotAddr := pot.NewAddressFromBytes(localAddr)
+
+ // set up kademlia
+ kadParams := network.NewKadParams()
+ kad := network.NewKademlia(localAddr, kadParams)
+ peerCount := kad.MinBinSize + 1
+
+ // set up pss
+ privKey, err := crypto.GenerateKey()
+ pssp := NewPssParams().WithPrivateKey(privKey)
+ ps, err := NewPss(kad, pssp)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+
+ // create kademlia peers, so we have peers both inside and outside minproxlimit
+ var peers []*network.Peer
+ proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes()
+ distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes()
+
+ for i := 0; i < peerCount; i++ {
+ rw := &p2p.MsgPipeRW{}
+ ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{})
+ protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
+ peerAddr := pot.RandomAddressAt(localPotAddr, i)
+ bzzPeer := &network.BzzPeer{
+ Peer: protoPeer,
+ BzzAddr: &network.BzzAddr{
+ OAddr: peerAddr.Bytes(),
+ UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
+ },
+ }
+ peer := network.NewPeer(bzzPeer, kad)
+ kad.On(peer)
+ peers = append(peers, peer)
+ }
+
+ // register it marking prox capability
+ delivered := make(chan struct{})
+ rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
+ log.Trace("in allowraw handler")
+ delivered <- struct{}{}
+ return nil
+ }
+ topic := BytesToTopic([]byte{0x2a})
+ hndlrProxDereg := ps.Register(&topic, &handler{
+ f: rawHandlerFunc,
+ caps: &handlerCaps{
+ raw: true,
+ prox: true,
+ },
+ })
+ defer hndlrProxDereg()
+
+ // send message too far away for sender to be in prox
+ // reception of this message should time out
+ errC := make(chan error)
+ go func() {
+ err := ps.SendRaw(distantMessageAddress, topic, []byte("foo"))
+ if err != nil {
+ errC <- err
+ }
+ }()
+
+ ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
+ defer cancel()
+ select {
+ case <-delivered:
+ t.Fatal("raw distant message delivered")
+ case err := <-errC:
+ t.Fatal(err)
+ case <-ctx.Done():
+ }
+
+ // send message that should be within sender prox
+ // this message should be delivered
+ go func() {
+ err := ps.SendRaw(proxMessageAddress, topic, []byte("bar"))
+ if err != nil {
+ errC <- err
+ }
+ }()
+
+ ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
+ defer cancel()
+ select {
+ case <-delivered:
+ case err := <-errC:
+ t.Fatal(err)
+ case <-ctx.Done():
+ t.Fatal("raw timeout")
+ }
+
+ // try the same prox message with sym and asym send
+ proxAddrPss := PssAddress(proxMessageAddress)
+ symKeyId, err := ps.GenerateSymmetricKey(topic, &proxAddrPss, true)
+ go func() {
+ err := ps.SendSym(symKeyId, topic, []byte("baz"))
+ if err != nil {
+ errC <- err
+ }
+ }()
+ ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
+ defer cancel()
+ select {
+ case <-delivered:
+ case err := <-errC:
+ t.Fatal(err)
+ case <-ctx.Done():
+ t.Fatal("sym timeout")
+ }
+
+ err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &proxAddrPss)
+ if err != nil {
+ t.Fatal(err)
+ }
+ pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey))
+ go func() {
+ err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy"))
+ if err != nil {
+ errC <- err
+ }
+ }()
+ ctx, cancel = context.WithTimeout(context.TODO(), time.Second)
+ defer cancel()
+ select {
+ case <-delivered:
+ case err := <-errC:
+ t.Fatal(err)
+ case <-ctx.Done():
+ t.Fatal("asym timeout")
+ }
+}
+
+// verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it
+// note that in these tests we use the raw capability on handlers for convenience
+func TestAddressMatchProx(t *testing.T) {
+
+ // recipient node address
+ localAddr := network.RandomAddr().Over()
+ localPotAddr := pot.NewAddressFromBytes(localAddr)
+
+ // set up kademlia
+ kadparams := network.NewKadParams()
+ kad := network.NewKademlia(localAddr, kadparams)
+ nnPeerCount := kad.MinBinSize
+ peerCount := nnPeerCount + 2
+
+ // set up pss
+ privKey, err := crypto.GenerateKey()
+ pssp := NewPssParams().WithPrivateKey(privKey)
+ ps, err := NewPss(kad, pssp)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+
+ // create kademlia peers, so we have peers both inside and outside minproxlimit
+ var peers []*network.Peer
+ for i := 0; i < peerCount; i++ {
+ rw := &p2p.MsgPipeRW{}
+ ptpPeer := p2p.NewPeer(enode.ID{}, "362436 call me anytime", []p2p.Cap{})
+ protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{})
+ peerAddr := pot.RandomAddressAt(localPotAddr, i)
+ bzzPeer := &network.BzzPeer{
+ Peer: protoPeer,
+ BzzAddr: &network.BzzAddr{
+ OAddr: peerAddr.Bytes(),
+ UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])),
+ },
+ }
+ peer := network.NewPeer(bzzPeer, kad)
+ kad.On(peer)
+ peers = append(peers, peer)
+ }
+
+ // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers
+ // meanwhile test regression for kademlia since we are compiling the test parameters from different packages
+ var proxes int
+ var conns int
+ kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool {
+ conns++
+ if prox {
+ proxes++
+ }
+ log.Trace("kadconn", "po", po, "peer", p, "prox", prox)
+ return true
+ })
+ if proxes != nnPeerCount {
+ t.Fatalf("expected %d proxpeers, have %d", nnPeerCount, proxes)
+ } else if conns != peerCount {
+ t.Fatalf("expected %d peers total, have %d", peerCount, proxes)
+ }
+
+ // remote address distances from localAddr to try and the expected outcomes if we use prox handler
+ remoteDistances := []int{
+ 255,
+ nnPeerCount + 1,
+ nnPeerCount,
+ nnPeerCount - 1,
+ 0,
+ }
+ expects := []bool{
+ true,
+ true,
+ true,
+ false,
+ false,
+ }
+
+ // first the unit test on the method that calculates possible receipient using prox
+ for i, distance := range remoteDistances {
+ pssMsg := newPssMsg(&msgParams{})
+ pssMsg.To = make([]byte, len(localAddr))
+ copy(pssMsg.To, localAddr)
+ var byteIdx = distance / 8
+ pssMsg.To[byteIdx] ^= 1 << uint(7-(distance%8))
+ log.Trace(fmt.Sprintf("addrmatch %v", bytes.Equal(pssMsg.To, localAddr)))
+ if ps.isSelfPossibleRecipient(pssMsg, true) != expects[i] {
+ t.Fatalf("expected distance %d to be %v", distance, expects[i])
+ }
+ }
+
+ // we move up to higher level and test the actual message handler
+ // for each distance check if we are possible recipient when prox variant is used is set
+
+ // this handler will increment a counter for every message that gets passed to the handler
+ var receives int
+ rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
+ log.Trace("in allowraw handler")
+ receives++
+ return nil
+ }
+
+ // register it marking prox capability
+ topic := BytesToTopic([]byte{0x2a})
+ hndlrProxDereg := ps.Register(&topic, &handler{
+ f: rawHandlerFunc,
+ caps: &handlerCaps{
+ raw: true,
+ prox: true,
+ },
+ })
+
+ // test the distances
+ var prevReceive int
+ for i, distance := range remoteDistances {
+ remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
+ remoteAddr := remotePotAddr.Bytes()
+
+ var data [32]byte
+ rand.Read(data[:])
+ pssMsg := newPssMsg(&msgParams{raw: true})
+ pssMsg.To = remoteAddr
+ pssMsg.Expire = uint32(time.Now().Unix() + 4200)
+ pssMsg.Payload = &whisper.Envelope{
+ Topic: whisper.TopicType(topic),
+ Data: data[:],
+ }
+
+ log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr)
+ ps.handlePssMsg(context.TODO(), pssMsg)
+ if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) {
+ t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i])
+ }
+ prevReceive = receives
+ }
+
+ // now add a non prox-capable handler and test
+ ps.Register(&topic, &handler{
+ f: rawHandlerFunc,
+ caps: &handlerCaps{
+ raw: true,
+ },
+ })
+ receives = 0
+ prevReceive = 0
+ for i, distance := range remoteDistances {
+ remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
+ remoteAddr := remotePotAddr.Bytes()
+
+ var data [32]byte
+ rand.Read(data[:])
+ pssMsg := newPssMsg(&msgParams{raw: true})
+ pssMsg.To = remoteAddr
+ pssMsg.Expire = uint32(time.Now().Unix() + 4200)
+ pssMsg.Payload = &whisper.Envelope{
+ Topic: whisper.TopicType(topic),
+ Data: data[:],
+ }
+
+ log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr)
+ ps.handlePssMsg(context.TODO(), pssMsg)
+ if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) {
+ t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i])
+ }
+ prevReceive = receives
+ }
+
+ // now deregister the prox capable handler, now none of the messages will be handled
+ hndlrProxDereg()
+ receives = 0
+
+ for _, distance := range remoteDistances {
+ remotePotAddr := pot.RandomAddressAt(localPotAddr, distance)
+ remoteAddr := remotePotAddr.Bytes()
+
+ pssMsg := newPssMsg(&msgParams{raw: true})
+ pssMsg.To = remoteAddr
+ pssMsg.Expire = uint32(time.Now().Unix() + 4200)
+ pssMsg.Payload = &whisper.Envelope{
+ Topic: whisper.TopicType(topic),
+ Data: []byte(remotePotAddr.String()),
+ }
+
+ log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr)
+ ps.handlePssMsg(context.TODO(), pssMsg)
+ if receives != 0 {
+ t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance)
+ }
+
+ }
+}
+
+// verify that message queueing happens when it should, and that expired and corrupt messages are dropped
+func TestMessageProcessing(t *testing.T) {
t.Skip("Disabled due to probable faulty logic for outbox expectations")
// setup
@@ -326,13 +650,12 @@ func TestHandlerConditions(t *testing.T) {
ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams())
// message should pass
- msg := &PssMsg{
- To: addr,
- Expire: uint32(time.Now().Add(time.Second * 60).Unix()),
- Payload: &whisper.Envelope{
- Topic: [4]byte{},
- Data: []byte{0x66, 0x6f, 0x6f},
- },
+ msg := newPssMsg(&msgParams{})
+ msg.To = addr
+ msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix())
+ msg.Payload = &whisper.Envelope{
+ Topic: [4]byte{},
+ Data: []byte{0x66, 0x6f, 0x6f},
}
if err := ps.handlePssMsg(context.TODO(), msg); err != nil {
t.Fatal(err.Error())
@@ -498,6 +821,7 @@ func TestKeys(t *testing.T) {
}
}
+// check that we can retrieve previously added public key entires per topic and peer
func TestGetPublickeyEntries(t *testing.T) {
privkey, err := crypto.GenerateKey()
@@ -557,7 +881,7 @@ OUTER:
}
// forwarding should skip peers that do not have matching pss capabilities
-func TestMismatch(t *testing.T) {
+func TestPeerCapabilityMismatch(t *testing.T) {
// create privkey for forwarder node
privkey, err := crypto.GenerateKey()
@@ -615,6 +939,76 @@ func TestMismatch(t *testing.T) {
}
+// verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed
+func TestRawAllow(t *testing.T) {
+
+ // set up pss like so many times before
+ privKey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatal(err)
+ }
+ baseAddr := network.RandomAddr()
+ kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams())
+ ps := newTestPss(privKey, kad, nil)
+ topic := BytesToTopic([]byte{0x2a})
+
+ // create handler innards that increments every time a message hits it
+ var receives int
+ rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
+ log.Trace("in allowraw handler")
+ receives++
+ return nil
+ }
+
+ // wrap this handler function with a handler without raw capability and register it
+ hndlrNoRaw := &handler{
+ f: rawHandlerFunc,
+ }
+ ps.Register(&topic, hndlrNoRaw)
+
+ // test it with a raw message, should be poo-poo
+ pssMsg := newPssMsg(&msgParams{
+ raw: true,
+ })
+ pssMsg.To = baseAddr.OAddr
+ pssMsg.Expire = uint32(time.Now().Unix() + 4200)
+ pssMsg.Payload = &whisper.Envelope{
+ Topic: whisper.TopicType(topic),
+ }
+ ps.handlePssMsg(context.TODO(), pssMsg)
+ if receives > 0 {
+ t.Fatalf("Expected handler not to be executed with raw cap off")
+ }
+
+ // now wrap the same handler function with raw capabilities and register it
+ hndlrRaw := &handler{
+ f: rawHandlerFunc,
+ caps: &handlerCaps{
+ raw: true,
+ },
+ }
+ deregRawHandler := ps.Register(&topic, hndlrRaw)
+
+ // should work now
+ pssMsg.Payload.Data = []byte("Raw Deal")
+ ps.handlePssMsg(context.TODO(), pssMsg)
+ if receives == 0 {
+ t.Fatalf("Expected handler to be executed with raw cap on")
+ }
+
+ // now deregister the raw capable handler
+ prevReceives := receives
+ deregRawHandler()
+
+ // check that raw messages fail again
+ pssMsg.Payload.Data = []byte("Raw Trump")
+ ps.handlePssMsg(context.TODO(), pssMsg)
+ if receives != prevReceives {
+ t.Fatalf("Expected handler not to be executed when raw handler is retracted")
+ }
+}
+
+// verifies that nodes can send and receive raw (verbatim) messages
func TestSendRaw(t *testing.T) {
t.Run("32", testSendRaw)
t.Run("8", testSendRaw)
@@ -658,13 +1052,13 @@ func testSendRaw(t *testing.T) {
lmsgC := make(chan APIMsg)
lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
defer lcancel()
- lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true, false)
log.Trace("lsub", "id", lsub)
defer lsub.Unsubscribe()
rmsgC := make(chan APIMsg)
rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
defer rcancel()
- rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true, false)
log.Trace("rsub", "id", rsub)
defer rsub.Unsubscribe()
@@ -757,13 +1151,13 @@ func testSendSym(t *testing.T) {
lmsgC := make(chan APIMsg)
lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
defer lcancel()
- lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
log.Trace("lsub", "id", lsub)
defer lsub.Unsubscribe()
rmsgC := make(chan APIMsg)
rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
defer rcancel()
- rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
log.Trace("rsub", "id", rsub)
defer rsub.Unsubscribe()
@@ -872,13 +1266,13 @@ func testSendAsym(t *testing.T) {
lmsgC := make(chan APIMsg)
lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10)
defer lcancel()
- lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
log.Trace("lsub", "id", lsub)
defer lsub.Unsubscribe()
rmsgC := make(chan APIMsg)
rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10)
defer rcancel()
- rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
log.Trace("rsub", "id", rsub)
defer rsub.Unsubscribe()
@@ -1037,7 +1431,7 @@ func testNetwork(t *testing.T) {
msgC := make(chan APIMsg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic)
+ sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false, false)
if err != nil {
t.Fatal(err)
}
@@ -1209,7 +1603,7 @@ func TestDeduplication(t *testing.T) {
rmsgC := make(chan APIMsg)
rctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
- rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
log.Trace("rsub", "id", rsub)
defer rsub.Unsubscribe()
@@ -1392,8 +1786,8 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
if err != nil {
b.Fatalf("could not generate whisper envelope: %v", err)
}
- ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
- return nil
+ ps.Register(&topic, &handler{
+ f: noopHandlerFunc,
})
pssmsgs = append(pssmsgs, &PssMsg{
To: to,
@@ -1402,7 +1796,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
- if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]); err != nil {
+ if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], false, false); err != nil {
b.Fatalf("pss processing failed: %v", err)
}
}
@@ -1476,15 +1870,15 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
if err != nil {
b.Fatalf("could not generate whisper envelope: %v", err)
}
- ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
- return nil
+ ps.Register(&topic, &handler{
+ f: noopHandlerFunc,
})
pssmsg := &PssMsg{
To: addr[len(addr)-1][:],
Payload: env,
}
for i := 0; i < b.N; i++ {
- if err := ps.process(pssmsg); err != nil {
+ if err := ps.process(pssmsg, false, false); err != nil {
b.Fatalf("pss processing failed: %v", err)
}
}
@@ -1581,7 +1975,12 @@ func newServices(allowRaw bool) adapters.Services {
if useHandshake {
SetHandshakeController(ps, NewHandshakeParams())
}
- ps.Register(&PingTopic, pp.Handle)
+ ps.Register(&PingTopic, &handler{
+ f: pp.Handle,
+ caps: &handlerCaps{
+ raw: true,
+ },
+ })
ps.addAPI(rpc.API{
Namespace: "psstest",
Version: "0.3",