diff options
Diffstat (limited to 'swarm/pss/pss_test.go')
-rw-r--r-- | swarm/pss/pss_test.go | 493 |
1 files changed, 446 insertions, 47 deletions
diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 66a90be62..32404aaaf 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -48,20 +48,23 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" ) var ( - initOnce = sync.Once{} - debugdebugflag = flag.Bool("vv", false, "veryverbose") - debugflag = flag.Bool("v", false, "verbose") - longrunning = flag.Bool("longrunning", false, "do run long-running tests") - w *whisper.Whisper - wapi *whisper.PublicWhisperAPI - psslogmain log.Logger - pssprotocols map[string]*protoCtrl - useHandshake bool + initOnce = sync.Once{} + loglevel = flag.Int("loglevel", 2, "logging verbosity") + longrunning = flag.Bool("longrunning", false, "do run long-running tests") + w *whisper.Whisper + wapi *whisper.PublicWhisperAPI + psslogmain log.Logger + pssprotocols map[string]*protoCtrl + useHandshake bool + noopHandlerFunc = func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + return nil + } ) func init() { @@ -75,16 +78,9 @@ func init() { func initTest() { initOnce.Do( func() { - loglevel := log.LvlInfo - if *debugflag { - loglevel = log.LvlDebug - } else if *debugdebugflag { - loglevel = log.LvlTrace - } - psslogmain = log.New("psslog", "*") hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true)) - hf := log.LvlFilterHandler(loglevel, hs) + hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs) h := log.CallerFileHandler(hf) log.Root().SetHandler(h) @@ -280,15 +276,14 @@ func TestAddressMatch(t *testing.T) { } pssmsg := &PssMsg{ - To: remoteaddr, - Payload: &whisper.Envelope{}, + To: remoteaddr, } // differ from first byte if ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr) } - if ps.isSelfPossibleRecipient(pssmsg) { + if ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient true but %x != %x", remoteaddr[:8], localaddr[:8]) } @@ -297,7 +292,7 @@ func TestAddressMatch(t *testing.T) { if ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient true but %x != %x", remoteaddr, localaddr) } - if !ps.isSelfPossibleRecipient(pssmsg) { + if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } @@ -306,13 +301,342 @@ func TestAddressMatch(t *testing.T) { if !ps.isSelfRecipient(pssmsg) { t.Fatalf("isSelfRecipient false but %x == %x", remoteaddr, localaddr) } - if !ps.isSelfPossibleRecipient(pssmsg) { + if !ps.isSelfPossibleRecipient(pssmsg, false) { t.Fatalf("isSelfPossibleRecipient false but %x == %x", remoteaddr[:8], localaddr[:8]) } + } -// -func TestHandlerConditions(t *testing.T) { +// test that message is handled by sender if a prox handler exists and sender is in prox of message +func TestProxShortCircuit(t *testing.T) { + + // sender node address + localAddr := network.RandomAddr().Over() + localPotAddr := pot.NewAddressFromBytes(localAddr) + + // set up kademlia + kadParams := network.NewKadParams() + kad := network.NewKademlia(localAddr, kadParams) + peerCount := kad.MinBinSize + 1 + + // set up pss + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + + // create kademlia peers, so we have peers both inside and outside minproxlimit + var peers []*network.Peer + proxMessageAddress := pot.RandomAddressAt(localPotAddr, peerCount).Bytes() + distantMessageAddress := pot.RandomAddressAt(localPotAddr, 0).Bytes() + + for i := 0; i < peerCount; i++ { + rw := &p2p.MsgPipeRW{} + ptpPeer := p2p.NewPeer(enode.ID{}, "wanna be with me? [ ] yes [ ] no", []p2p.Cap{}) + protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) + peerAddr := pot.RandomAddressAt(localPotAddr, i) + bzzPeer := &network.BzzPeer{ + Peer: protoPeer, + BzzAddr: &network.BzzAddr{ + OAddr: peerAddr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), + }, + } + peer := network.NewPeer(bzzPeer, kad) + kad.On(peer) + peers = append(peers, peer) + } + + // register it marking prox capability + delivered := make(chan struct{}) + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + delivered <- struct{}{} + return nil + } + topic := BytesToTopic([]byte{0x2a}) + hndlrProxDereg := ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + prox: true, + }, + }) + defer hndlrProxDereg() + + // send message too far away for sender to be in prox + // reception of this message should time out + errC := make(chan error) + go func() { + err := ps.SendRaw(distantMessageAddress, topic, []byte("foo")) + if err != nil { + errC <- err + } + }() + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + t.Fatal("raw distant message delivered") + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + } + + // send message that should be within sender prox + // this message should be delivered + go func() { + err := ps.SendRaw(proxMessageAddress, topic, []byte("bar")) + if err != nil { + errC <- err + } + }() + + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("raw timeout") + } + + // try the same prox message with sym and asym send + proxAddrPss := PssAddress(proxMessageAddress) + symKeyId, err := ps.GenerateSymmetricKey(topic, &proxAddrPss, true) + go func() { + err := ps.SendSym(symKeyId, topic, []byte("baz")) + if err != nil { + errC <- err + } + }() + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("sym timeout") + } + + err = ps.SetPeerPublicKey(&privKey.PublicKey, topic, &proxAddrPss) + if err != nil { + t.Fatal(err) + } + pubKeyId := hexutil.Encode(crypto.FromECDSAPub(&privKey.PublicKey)) + go func() { + err := ps.SendAsym(pubKeyId, topic, []byte("xyzzy")) + if err != nil { + errC <- err + } + }() + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-delivered: + case err := <-errC: + t.Fatal(err) + case <-ctx.Done(): + t.Fatal("asym timeout") + } +} + +// verify that node can be set as recipient regardless of explicit message address match if minimum one handler of a topic is explicitly set to allow it +// note that in these tests we use the raw capability on handlers for convenience +func TestAddressMatchProx(t *testing.T) { + + // recipient node address + localAddr := network.RandomAddr().Over() + localPotAddr := pot.NewAddressFromBytes(localAddr) + + // set up kademlia + kadparams := network.NewKadParams() + kad := network.NewKademlia(localAddr, kadparams) + nnPeerCount := kad.MinBinSize + peerCount := nnPeerCount + 2 + + // set up pss + privKey, err := crypto.GenerateKey() + pssp := NewPssParams().WithPrivateKey(privKey) + ps, err := NewPss(kad, pssp) + if err != nil { + t.Fatal(err.Error()) + } + + // create kademlia peers, so we have peers both inside and outside minproxlimit + var peers []*network.Peer + for i := 0; i < peerCount; i++ { + rw := &p2p.MsgPipeRW{} + ptpPeer := p2p.NewPeer(enode.ID{}, "362436 call me anytime", []p2p.Cap{}) + protoPeer := protocols.NewPeer(ptpPeer, rw, &protocols.Spec{}) + peerAddr := pot.RandomAddressAt(localPotAddr, i) + bzzPeer := &network.BzzPeer{ + Peer: protoPeer, + BzzAddr: &network.BzzAddr{ + OAddr: peerAddr.Bytes(), + UAddr: []byte(fmt.Sprintf("%x", peerAddr[:])), + }, + } + peer := network.NewPeer(bzzPeer, kad) + kad.On(peer) + peers = append(peers, peer) + } + + // TODO: create a test in the network package to make a table with n peers where n-m are proxpeers + // meanwhile test regression for kademlia since we are compiling the test parameters from different packages + var proxes int + var conns int + kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool { + conns++ + if prox { + proxes++ + } + log.Trace("kadconn", "po", po, "peer", p, "prox", prox) + return true + }) + if proxes != nnPeerCount { + t.Fatalf("expected %d proxpeers, have %d", nnPeerCount, proxes) + } else if conns != peerCount { + t.Fatalf("expected %d peers total, have %d", peerCount, proxes) + } + + // remote address distances from localAddr to try and the expected outcomes if we use prox handler + remoteDistances := []int{ + 255, + nnPeerCount + 1, + nnPeerCount, + nnPeerCount - 1, + 0, + } + expects := []bool{ + true, + true, + true, + false, + false, + } + + // first the unit test on the method that calculates possible receipient using prox + for i, distance := range remoteDistances { + pssMsg := newPssMsg(&msgParams{}) + pssMsg.To = make([]byte, len(localAddr)) + copy(pssMsg.To, localAddr) + var byteIdx = distance / 8 + pssMsg.To[byteIdx] ^= 1 << uint(7-(distance%8)) + log.Trace(fmt.Sprintf("addrmatch %v", bytes.Equal(pssMsg.To, localAddr))) + if ps.isSelfPossibleRecipient(pssMsg, true) != expects[i] { + t.Fatalf("expected distance %d to be %v", distance, expects[i]) + } + } + + // we move up to higher level and test the actual message handler + // for each distance check if we are possible recipient when prox variant is used is set + + // this handler will increment a counter for every message that gets passed to the handler + var receives int + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + receives++ + return nil + } + + // register it marking prox capability + topic := BytesToTopic([]byte{0x2a}) + hndlrProxDereg := ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + prox: true, + }, + }) + + // test the distances + var prevReceive int + for i, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + var data [32]byte + rand.Read(data[:]) + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: data[:], + } + + log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { + t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) + } + prevReceive = receives + } + + // now add a non prox-capable handler and test + ps.Register(&topic, &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + }, + }) + receives = 0 + prevReceive = 0 + for i, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + var data [32]byte + rand.Read(data[:]) + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: data[:], + } + + log.Trace("withprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if (!expects[i] && prevReceive != receives) || (expects[i] && prevReceive == receives) { + t.Fatalf("expected distance %d recipient %v when prox is set for handler", distance, expects[i]) + } + prevReceive = receives + } + + // now deregister the prox capable handler, now none of the messages will be handled + hndlrProxDereg() + receives = 0 + + for _, distance := range remoteDistances { + remotePotAddr := pot.RandomAddressAt(localPotAddr, distance) + remoteAddr := remotePotAddr.Bytes() + + pssMsg := newPssMsg(&msgParams{raw: true}) + pssMsg.To = remoteAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + Data: []byte(remotePotAddr.String()), + } + + log.Trace("noprox addrs", "local", localAddr, "remote", remoteAddr) + ps.handlePssMsg(context.TODO(), pssMsg) + if receives != 0 { + t.Fatalf("expected distance %d to not be recipient when prox is not set for handler", distance) + } + + } +} + +// verify that message queueing happens when it should, and that expired and corrupt messages are dropped +func TestMessageProcessing(t *testing.T) { t.Skip("Disabled due to probable faulty logic for outbox expectations") // setup @@ -326,13 +650,12 @@ func TestHandlerConditions(t *testing.T) { ps := newTestPss(privkey, network.NewKademlia(addr, network.NewKadParams()), NewPssParams()) // message should pass - msg := &PssMsg{ - To: addr, - Expire: uint32(time.Now().Add(time.Second * 60).Unix()), - Payload: &whisper.Envelope{ - Topic: [4]byte{}, - Data: []byte{0x66, 0x6f, 0x6f}, - }, + msg := newPssMsg(&msgParams{}) + msg.To = addr + msg.Expire = uint32(time.Now().Add(time.Second * 60).Unix()) + msg.Payload = &whisper.Envelope{ + Topic: [4]byte{}, + Data: []byte{0x66, 0x6f, 0x6f}, } if err := ps.handlePssMsg(context.TODO(), msg); err != nil { t.Fatal(err.Error()) @@ -498,6 +821,7 @@ func TestKeys(t *testing.T) { } } +// check that we can retrieve previously added public key entires per topic and peer func TestGetPublickeyEntries(t *testing.T) { privkey, err := crypto.GenerateKey() @@ -557,7 +881,7 @@ OUTER: } // forwarding should skip peers that do not have matching pss capabilities -func TestMismatch(t *testing.T) { +func TestPeerCapabilityMismatch(t *testing.T) { // create privkey for forwarder node privkey, err := crypto.GenerateKey() @@ -615,6 +939,76 @@ func TestMismatch(t *testing.T) { } +// verifies that message handlers for raw messages only are invoked when minimum one handler for the topic exists in which raw messages are explicitly allowed +func TestRawAllow(t *testing.T) { + + // set up pss like so many times before + privKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + baseAddr := network.RandomAddr() + kad := network.NewKademlia((baseAddr).Over(), network.NewKadParams()) + ps := newTestPss(privKey, kad, nil) + topic := BytesToTopic([]byte{0x2a}) + + // create handler innards that increments every time a message hits it + var receives int + rawHandlerFunc := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { + log.Trace("in allowraw handler") + receives++ + return nil + } + + // wrap this handler function with a handler without raw capability and register it + hndlrNoRaw := &handler{ + f: rawHandlerFunc, + } + ps.Register(&topic, hndlrNoRaw) + + // test it with a raw message, should be poo-poo + pssMsg := newPssMsg(&msgParams{ + raw: true, + }) + pssMsg.To = baseAddr.OAddr + pssMsg.Expire = uint32(time.Now().Unix() + 4200) + pssMsg.Payload = &whisper.Envelope{ + Topic: whisper.TopicType(topic), + } + ps.handlePssMsg(context.TODO(), pssMsg) + if receives > 0 { + t.Fatalf("Expected handler not to be executed with raw cap off") + } + + // now wrap the same handler function with raw capabilities and register it + hndlrRaw := &handler{ + f: rawHandlerFunc, + caps: &handlerCaps{ + raw: true, + }, + } + deregRawHandler := ps.Register(&topic, hndlrRaw) + + // should work now + pssMsg.Payload.Data = []byte("Raw Deal") + ps.handlePssMsg(context.TODO(), pssMsg) + if receives == 0 { + t.Fatalf("Expected handler to be executed with raw cap on") + } + + // now deregister the raw capable handler + prevReceives := receives + deregRawHandler() + + // check that raw messages fail again + pssMsg.Payload.Data = []byte("Raw Trump") + ps.handlePssMsg(context.TODO(), pssMsg) + if receives != prevReceives { + t.Fatalf("Expected handler not to be executed when raw handler is retracted") + } +} + +// verifies that nodes can send and receive raw (verbatim) messages func TestSendRaw(t *testing.T) { t.Run("32", testSendRaw) t.Run("8", testSendRaw) @@ -658,13 +1052,13 @@ func testSendRaw(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, true, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, true, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -757,13 +1151,13 @@ func testSendSym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -872,13 +1266,13 @@ func testSendAsym(t *testing.T) { lmsgC := make(chan APIMsg) lctx, lcancel := context.WithTimeout(context.Background(), time.Second*10) defer lcancel() - lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false) log.Trace("lsub", "id", lsub) defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, rcancel := context.WithTimeout(context.Background(), time.Second*10) defer rcancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1037,7 +1431,7 @@ func testNetwork(t *testing.T) { msgC := make(chan APIMsg) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic) + sub, err := rpcclient.Subscribe(ctx, "pss", msgC, "receive", topic, false, false) if err != nil { t.Fatal(err) } @@ -1209,7 +1603,7 @@ func TestDeduplication(t *testing.T) { rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() - rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false) log.Trace("rsub", "id", rsub) defer rsub.Unsubscribe() @@ -1392,8 +1786,8 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { if err != nil { b.Fatalf("could not generate whisper envelope: %v", err) } - ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil + ps.Register(&topic, &handler{ + f: noopHandlerFunc, }) pssmsgs = append(pssmsgs, &PssMsg{ To: to, @@ -1402,7 +1796,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1]); err != nil { + if err := ps.process(pssmsgs[len(pssmsgs)-(i%len(pssmsgs))-1], false, false); err != nil { b.Fatalf("pss processing failed: %v", err) } } @@ -1476,15 +1870,15 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) { if err != nil { b.Fatalf("could not generate whisper envelope: %v", err) } - ps.Register(&topic, func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { - return nil + ps.Register(&topic, &handler{ + f: noopHandlerFunc, }) pssmsg := &PssMsg{ To: addr[len(addr)-1][:], Payload: env, } for i := 0; i < b.N; i++ { - if err := ps.process(pssmsg); err != nil { + if err := ps.process(pssmsg, false, false); err != nil { b.Fatalf("pss processing failed: %v", err) } } @@ -1581,7 +1975,12 @@ func newServices(allowRaw bool) adapters.Services { if useHandshake { SetHandshakeController(ps, NewHandshakeParams()) } - ps.Register(&PingTopic, pp.Handle) + ps.Register(&PingTopic, &handler{ + f: pp.Handle, + caps: &handlerCaps{ + raw: true, + }, + }) ps.addAPI(rpc.API{ Namespace: "psstest", Version: "0.3", |