aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/peer_test.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-11-22 04:48:49 +0800
committerFelix Lange <fjl@twurst.com>2014-11-22 04:52:45 +0800
commit59b63caf5e4de64ceb7dcdf01551a080f53b1672 (patch)
treea4e79590284c5afe4d6927b422a5092b074e7938 /p2p/peer_test.go
parente4a601c6444afdc11ce0cb80d7fd83116de2c8b9 (diff)
downloadgo-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.gz
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.zst
go-tangerine-59b63caf5e4de64ceb7dcdf01551a080f53b1672.zip
p2p: API cleanup and PoC 7 compatibility
Whoa, one more big commit. I didn't manage to untangle the changes while working towards compatibility.
Diffstat (limited to 'p2p/peer_test.go')
-rw-r--r--p2p/peer_test.go308
1 files changed, 220 insertions, 88 deletions
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
index da62cc380..1afa0ab17 100644
--- a/p2p/peer_test.go
+++ b/p2p/peer_test.go
@@ -1,90 +1,222 @@
package p2p
-// "net"
-
-// func TestPeer(t *testing.T) {
-// handlers := make(Handlers)
-// testProtocol := &TestProtocol{recv: make(chan testMsg)}
-// handlers["aaa"] = func(p *Peer) Protocol { return testProtocol }
-// handlers["ccc"] = func(p *Peer) Protocol { return testProtocol }
-// addr := &TestAddr{"test:30"}
-// conn := NewTestNetworkConnection(addr)
-// _, server := SetupTestServer(handlers)
-// server.Handshake()
-// peer := NewPeer(conn, addr, true, server)
-// // peer.Messenger().AddProtocols([]string{"aaa", "ccc"})
-// peer.Start()
-// defer peer.Stop()
-// time.Sleep(2 * time.Millisecond)
-// if len(conn.Out) != 1 {
-// t.Errorf("handshake not sent")
-// } else {
-// out := conn.Out[0]
-// packet := Packet(0, HandshakeMsg, P2PVersion, []byte(peer.server.identity.String()), []interface{}{peer.server.protocols}, peer.server.port, peer.server.identity.Pubkey()[1:])
-// if bytes.Compare(out, packet) != 0 {
-// t.Errorf("incorrect handshake packet %v != %v", out, packet)
-// }
-// }
-
-// packet := Packet(0, HandshakeMsg, P2PVersion, []byte("peer"), []interface{}{"bbb", "aaa", "ccc"}, 30, []byte("0000000000000000000000000000000000000000000000000000000000000000"))
-// conn.In(0, packet)
-// time.Sleep(10 * time.Millisecond)
-
-// pro, _ := peer.Messenger().protocols[0].(*BaseProtocol)
-// if pro.state != handshakeReceived {
-// t.Errorf("handshake not received")
-// }
-// if peer.Port != 30 {
-// t.Errorf("port incorrectly set")
-// }
-// if peer.Id != "peer" {
-// t.Errorf("id incorrectly set")
-// }
-// if string(peer.Pubkey) != "0000000000000000000000000000000000000000000000000000000000000000" {
-// t.Errorf("pubkey incorrectly set")
-// }
-// fmt.Println(peer.Caps)
-// if len(peer.Caps) != 3 || peer.Caps[0] != "aaa" || peer.Caps[1] != "bbb" || peer.Caps[2] != "ccc" {
-// t.Errorf("protocols incorrectly set")
-// }
-
-// msg := NewMsg(3)
-// err := peer.Write("aaa", msg)
-// if err != nil {
-// t.Errorf("expect no error for known protocol: %v", err)
-// } else {
-// time.Sleep(1 * time.Millisecond)
-// if len(conn.Out) != 2 {
-// t.Errorf("msg not written")
-// } else {
-// out := conn.Out[1]
-// packet := Packet(16, 3)
-// if bytes.Compare(out, packet) != 0 {
-// t.Errorf("incorrect packet %v != %v", out, packet)
-// }
-// }
-// }
-
-// msg = NewMsg(2)
-// err = peer.Write("ccc", msg)
-// if err != nil {
-// t.Errorf("expect no error for known protocol: %v", err)
-// } else {
-// time.Sleep(1 * time.Millisecond)
-// if len(conn.Out) != 3 {
-// t.Errorf("msg not written")
-// } else {
-// out := conn.Out[2]
-// packet := Packet(21, 2)
-// if bytes.Compare(out, packet) != 0 {
-// t.Errorf("incorrect packet %v != %v", out, packet)
-// }
-// }
-// }
-
-// err = peer.Write("bbb", msg)
-// time.Sleep(1 * time.Millisecond)
-// if err == nil {
-// t.Errorf("expect error for unknown protocol")
-// }
-// }
+import (
+ "bufio"
+ "net"
+ "reflect"
+ "testing"
+ "time"
+)
+
+var discard = Protocol{
+ Name: "discard",
+ Length: 1,
+ Run: func(p *Peer, rw MsgReadWriter) error {
+ for {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if err = msg.Discard(); err != nil {
+ return err
+ }
+ }
+ },
+}
+
+func testPeer(protos []Protocol) (net.Conn, *Peer, <-chan error) {
+ conn1, conn2 := net.Pipe()
+ id := NewSimpleClientIdentity("test", "0", "0", "public key")
+ peer := newPeer(conn1, protos, nil)
+ peer.ourID = id
+ peer.pubkeyHook = func(*peerAddr) error { return nil }
+ errc := make(chan error, 1)
+ go func() {
+ _, err := peer.loop()
+ errc <- err
+ }()
+ return conn2, peer, errc
+}
+
+func TestPeerProtoReadMsg(t *testing.T) {
+ defer testlog(t).detach()
+
+ done := make(chan struct{})
+ proto := Protocol{
+ Name: "a",
+ Length: 5,
+ Run: func(peer *Peer, rw MsgReadWriter) error {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ }
+ if msg.Code != 2 {
+ t.Errorf("incorrect msg code %d relayed to protocol", msg.Code)
+ }
+ data, err := msg.Data()
+ if err != nil {
+ t.Errorf("data decoding error: %v", err)
+ }
+ expdata := []interface{}{1, []byte{0x30, 0x30, 0x30}}
+ if !reflect.DeepEqual(data.Slice(), expdata) {
+ t.Errorf("incorrect msg data %#v", data.Slice())
+ }
+ close(done)
+ return nil
+ },
+ }
+
+ net, peer, errc := testPeer([]Protocol{proto})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{proto.cap()})
+
+ writeMsg(net, NewMsg(18, 1, "000"))
+ select {
+ case <-done:
+ case err := <-errc:
+ t.Errorf("peer returned: %v", err)
+ case <-time.After(2 * time.Second):
+ t.Errorf("receive timeout")
+ }
+}
+
+func TestPeerProtoReadLargeMsg(t *testing.T) {
+ defer testlog(t).detach()
+
+ msgsize := uint32(10 * 1024 * 1024)
+ done := make(chan struct{})
+ proto := Protocol{
+ Name: "a",
+ Length: 5,
+ Run: func(peer *Peer, rw MsgReadWriter) error {
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ }
+ if msg.Size != msgsize+4 {
+ t.Errorf("incorrect msg.Size, got %d, expected %d", msg.Size, msgsize)
+ }
+ msg.Discard()
+ close(done)
+ return nil
+ },
+ }
+
+ net, peer, errc := testPeer([]Protocol{proto})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{proto.cap()})
+
+ writeMsg(net, NewMsg(18, make([]byte, msgsize)))
+ select {
+ case <-done:
+ case err := <-errc:
+ t.Errorf("peer returned: %v", err)
+ case <-time.After(2 * time.Second):
+ t.Errorf("receive timeout")
+ }
+}
+
+func TestPeerProtoEncodeMsg(t *testing.T) {
+ defer testlog(t).detach()
+
+ proto := Protocol{
+ Name: "a",
+ Length: 2,
+ Run: func(peer *Peer, rw MsgReadWriter) error {
+ if err := rw.EncodeMsg(2); err == nil {
+ t.Error("expected error for out-of-range msg code, got nil")
+ }
+ if err := rw.EncodeMsg(1); err != nil {
+ t.Errorf("write error: %v", err)
+ }
+ return nil
+ },
+ }
+ net, peer, _ := testPeer([]Protocol{proto})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{proto.cap()})
+
+ bufr := bufio.NewReader(net)
+ msg, err := readMsg(bufr)
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ }
+ if msg.Code != 17 {
+ t.Errorf("incorrect message code: got %d, expected %d", msg.Code, 17)
+ }
+}
+
+func TestPeerWrite(t *testing.T) {
+ defer testlog(t).detach()
+
+ net, peer, peerErr := testPeer([]Protocol{discard})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{discard.cap()})
+
+ // test write errors
+ if err := peer.writeProtoMsg("b", NewMsg(3)); err == nil {
+ t.Errorf("expected error for unknown protocol, got nil")
+ }
+ if err := peer.writeProtoMsg("discard", NewMsg(8)); err == nil {
+ t.Errorf("expected error for out-of-range msg code, got nil")
+ } else if perr, ok := err.(*peerError); !ok || perr.Code != errInvalidMsgCode {
+ t.Errorf("wrong error for out-of-range msg code, got %#v", err)
+ }
+
+ // setup for reading the message on the other end
+ read := make(chan struct{})
+ go func() {
+ bufr := bufio.NewReader(net)
+ msg, err := readMsg(bufr)
+ if err != nil {
+ t.Errorf("read error: %v", err)
+ } else if msg.Code != 16 {
+ t.Errorf("wrong code, got %d, expected %d", msg.Code, 16)
+ }
+ msg.Discard()
+ close(read)
+ }()
+
+ // test succcessful write
+ if err := peer.writeProtoMsg("discard", NewMsg(0)); err != nil {
+ t.Errorf("expect no error for known protocol: %v", err)
+ }
+ select {
+ case <-read:
+ case err := <-peerErr:
+ t.Fatalf("peer stopped: %v", err)
+ }
+}
+
+func TestPeerActivity(t *testing.T) {
+ // shorten inactivityTimeout while this test is running
+ oldT := inactivityTimeout
+ defer func() { inactivityTimeout = oldT }()
+ inactivityTimeout = 20 * time.Millisecond
+
+ net, peer, peerErr := testPeer([]Protocol{discard})
+ defer net.Close()
+ peer.startSubprotocols([]Cap{discard.cap()})
+
+ sub := peer.activity.Subscribe(time.Time{})
+ defer sub.Unsubscribe()
+
+ for i := 0; i < 6; i++ {
+ writeMsg(net, NewMsg(16))
+ select {
+ case <-sub.Chan():
+ case <-time.After(inactivityTimeout / 2):
+ t.Fatal("no event within ", inactivityTimeout/2)
+ case err := <-peerErr:
+ t.Fatal("peer error", err)
+ }
+ }
+
+ select {
+ case <-time.After(inactivityTimeout * 2):
+ case <-sub.Chan():
+ t.Fatal("got activity event while connection was inactive")
+ case err := <-peerErr:
+ t.Fatal("peer error", err)
+ }
+}