diff options
author | Felix Lange <fjl@twurst.com> | 2015-05-16 06:38:28 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-05-25 07:17:14 +0800 |
commit | 1440f9a37a8baf67b989ddf0b8cc30c9a1970e14 (patch) | |
tree | f8db89ae4aeea16c4cb87877df8fea9688c6ac99 /p2p/server_test.go | |
parent | 9f38ef5d970d1ccb50d2a7697562ea547ff625c8 (diff) | |
download | dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.gz dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.tar.zst dexon-1440f9a37a8baf67b989ddf0b8cc30c9a1970e14.zip |
p2p: new dialer, peer management without locks
The most visible change is event-based dialing, which should be an
improvement over the timer-based system that we have at the moment.
The dialer gets a chance to compute new tasks whenever peers change or
dials complete. This is better than checking peers on a timer because
dials happen faster. The dialer can now make more precise decisions
about whom to dial based on the peer set and we can test those
decisions without actually opening any sockets.
Peer management is easier to test because the tests can inject
connections at checkpoints (after enc handshake, after protocol
handshake).
Most of the handshake stuff is now part of the RLPx code. It could be
exported or move to its own package because it is no longer entangled
with Server logic.
Diffstat (limited to 'p2p/server_test.go')
-rw-r--r-- | p2p/server_test.go | 584 |
1 files changed, 270 insertions, 314 deletions
diff --git a/p2p/server_test.go b/p2p/server_test.go index 6f7aaf8e1..01448cc7b 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -2,8 +2,10 @@ package p2p import ( "crypto/ecdsa" + "errors" "math/rand" "net" + "reflect" "testing" "time" @@ -12,29 +14,50 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" ) -func startTestServer(t *testing.T, pf newPeerHook) *Server { +func init() { + // glog.SetV(6) + // glog.SetToStderr(true) +} + +type testTransport struct { + id discover.NodeID + *rlpx + + closeErr error +} + +func newTestTransport(id discover.NodeID, fd net.Conn) transport { + wrapped := newRLPX(fd).(*rlpx) + wrapped.rw = newRLPXFrameRW(fd, secrets{ + MAC: zero16, + AES: zero16, + IngressMAC: sha3.NewKeccak256(), + EgressMAC: sha3.NewKeccak256(), + }) + return &testTransport{id: id, rlpx: wrapped} +} + +func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error) { + return c.id, nil +} + +func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) { + return &protoHandshake{ID: c.id, Name: "test"}, nil +} + +func (c *testTransport) close(err error) { + c.rlpx.fd.Close() + c.closeErr = err +} + +func startTestServer(t *testing.T, id discover.NodeID, pf func(*Peer)) *Server { server := &Server{ - Name: "test", - MaxPeers: 10, - ListenAddr: "127.0.0.1:0", - PrivateKey: newkey(), - newPeerHook: pf, - setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) { - id := randomID() - if !keepconn(id) { - return nil, DiscAlreadyConnected - } - rw := newRlpxFrameRW(fd, secrets{ - MAC: zero16, - AES: zero16, - IngressMAC: sha3.NewKeccak256(), - EgressMAC: sha3.NewKeccak256(), - }) - return &conn{ - MsgReadWriter: rw, - protoHandshake: &protoHandshake{ID: id, Version: baseProtocolVersion}, - }, nil - }, + Name: "test", + MaxPeers: 10, + ListenAddr: "127.0.0.1:0", + PrivateKey: newkey(), + newPeerHook: pf, + newTransport: func(fd net.Conn) transport { return newTestTransport(id, fd) }, } if err := server.Start(); err != nil { t.Fatalf("Could not start server: %v", err) @@ -45,7 +68,11 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server { func TestServerListen(t *testing.T) { // start the test server connected := make(chan *Peer) - srv := startTestServer(t, func(p *Peer) { + remid := randomID() + srv := startTestServer(t, remid, func(p *Peer) { + if p.ID() != remid { + t.Error("peer func called with wrong node id") + } if p == nil { t.Error("peer func called with nil conn") } @@ -67,6 +94,10 @@ func TestServerListen(t *testing.T) { t.Errorf("peer started with wrong conn: got %v, want %v", peer.LocalAddr(), conn.RemoteAddr()) } + peers := srv.Peers() + if !reflect.DeepEqual(peers, []*Peer{peer}) { + t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer}) + } case <-time.After(1 * time.Second): t.Error("server did not accept within one second") } @@ -92,23 +123,33 @@ func TestServerDial(t *testing.T) { // start the server connected := make(chan *Peer) - srv := startTestServer(t, func(p *Peer) { connected <- p }) + remid := randomID() + srv := startTestServer(t, remid, func(p *Peer) { connected <- p }) defer close(connected) defer srv.Stop() // tell the server to connect tcpAddr := listener.Addr().(*net.TCPAddr) - srv.staticDial <- &discover.Node{IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)} + srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}) select { case conn := <-accepted: select { case peer := <-connected: + if peer.ID() != remid { + t.Errorf("peer has wrong id") + } + if peer.Name() != "test" { + t.Errorf("peer has wrong name") + } if peer.RemoteAddr().String() != conn.LocalAddr().String() { t.Errorf("peer started with wrong conn: got %v, want %v", peer.RemoteAddr(), conn.LocalAddr()) } - // TODO: validate more fields + peers := srv.Peers() + if !reflect.DeepEqual(peers, []*Peer{peer}) { + t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer}) + } case <-time.After(1 * time.Second): t.Error("server did not launch peer within one second") } @@ -118,331 +159,250 @@ func TestServerDial(t *testing.T) { } } -// This test checks that connections are disconnected -// just after the encryption handshake when the server is -// at capacity. -// -// It also serves as a light-weight integration test. -func TestServerDisconnectAtCap(t *testing.T) { - started := make(chan *Peer) - srv := &Server{ - ListenAddr: "127.0.0.1:0", - PrivateKey: newkey(), - MaxPeers: 10, - NoDial: true, - // This hook signals that the peer was actually started. We - // need to wait for the peer to be started before dialing the - // next connection to get a deterministic peer count. - newPeerHook: func(p *Peer) { started <- p }, - } - if err := srv.Start(); err != nil { - t.Fatal(err) - } - defer srv.Stop() - - nconns := srv.MaxPeers + 1 - dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)} - for i := 0; i < nconns; i++ { - conn, err := dialer.Dial("tcp", srv.ListenAddr) - if err != nil { - t.Fatalf("conn %d: dial error: %v", i, err) - } - // Close the connection when the test ends, before - // shutting down the server. - defer conn.Close() - // Run the handshakes just like a real peer would. - key := newkey() - hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} - _, err = setupConn(conn, key, hs, srv.Self(), keepalways) - if i == nconns-1 { - // When handling the last connection, the server should - // disconnect immediately instead of running the protocol - // handshake. - if err != DiscTooManyPeers { - t.Errorf("conn %d: got error %q, expected %q", i, err, DiscTooManyPeers) - } - } else { - // For all earlier connections, the handshake should go through. - if err != nil { - t.Fatalf("conn %d: unexpected error: %v", i, err) - } - // Wait for runPeer to be started. - <-started +// This test checks that tasks generated by dialstate are +// actually executed and taskdone is called for them. +func TestServerTaskScheduling(t *testing.T) { + var ( + done = make(chan *testTask) + quit, returned = make(chan struct{}), make(chan struct{}) + tc = 0 + tg = taskgen{ + newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { + tc++ + return []task{&testTask{index: tc - 1}} + }, + doneFunc: func(t task) { + select { + case done <- t.(*testTask): + case <-quit: + } + }, } - } -} + ) -// Tests that static peers are (re)connected, and done so even above max peers. -func TestServerStaticPeers(t *testing.T) { - // Create a test server with limited connection slots - started := make(chan *Peer) - server := &Server{ - ListenAddr: "127.0.0.1:0", - PrivateKey: newkey(), - MaxPeers: 3, - newPeerHook: func(p *Peer) { started <- p }, - staticCycle: time.Second, - } - if err := server.Start(); err != nil { - t.Fatal(err) + // The Server in this test isn't actually running + // because we're only interested in what run does. + srv := &Server{ + MaxPeers: 10, + quit: make(chan struct{}), + ntab: fakeTable{}, + running: true, } - defer server.Stop() - - // Fill up all the slots on the server - dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)} - for i := 0; i < server.MaxPeers; i++ { - // Establish a new connection - conn, err := dialer.Dial("tcp", server.ListenAddr) - if err != nil { - t.Fatalf("conn %d: dial error: %v", i, err) - } - defer conn.Close() + srv.loopWG.Add(1) + go func() { + srv.run(tg) + close(returned) + }() - // Run the handshakes just like a real peer would, and wait for completion - key := newkey() - shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} - if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil { - t.Fatalf("conn %d: unexpected error: %v", i, err) - } - <-started + var gotdone []*testTask + for i := 0; i < 100; i++ { + gotdone = append(gotdone, <-done) } - // Open a TCP listener to accept static connections - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("failed to setup listener: %v", err) - } - defer listener.Close() - - connected := make(chan net.Conn) - go func() { - for i := 0; i < 3; i++ { - conn, err := listener.Accept() - if err == nil { - connected <- conn - } + for i, task := range gotdone { + if task.index != i { + t.Errorf("task %d has wrong index, got %d", i, task.index) + break + } + if !task.called { + t.Errorf("task %d was not called", i) + break } - }() - // Inject a static node and wait for a remote dial, then redial, then nothing - addr := listener.Addr().(*net.TCPAddr) - static := &discover.Node{ - ID: discover.PubkeyID(&newkey().PublicKey), - IP: addr.IP, - TCP: uint16(addr.Port), } - server.AddPeer(static) + close(quit) + srv.Stop() select { - case conn := <-connected: - // Close the first connection, expect redial - conn.Close() - - case <-time.After(2 * server.staticCycle): - t.Fatalf("remote dial timeout") + case <-returned: + case <-time.After(500 * time.Millisecond): + t.Error("Server.run did not return within 500ms") } +} - select { - case conn := <-connected: - // Keep the second connection, don't expect redial - defer conn.Close() - - case <-time.After(2 * server.staticCycle): - t.Fatalf("remote re-dial timeout") - } +type taskgen struct { + newFunc func(running int, peers map[discover.NodeID]*Peer) []task + doneFunc func(task) +} - select { - case <-time.After(2 * server.staticCycle): - // Timeout as no dial occurred +func (tg taskgen) newTasks(running int, peers map[discover.NodeID]*Peer, now time.Time) []task { + return tg.newFunc(running, peers) +} +func (tg taskgen) taskDone(t task, now time.Time) { + tg.doneFunc(t) +} +func (tg taskgen) addStatic(*discover.Node) { +} - case <-connected: - t.Fatalf("connected node dialed") - } +type testTask struct { + index int + called bool } -// Tests that trusted peers and can connect above max peer caps. -func TestServerTrustedPeers(t *testing.T) { +func (t *testTask) Do(srv *Server) { + t.called = true +} - // Create a trusted peer to accept connections from - key := newkey() - trusted := &discover.Node{ - ID: discover.PubkeyID(&key.PublicKey), - } - // Create a test server with limited connection slots - started := make(chan *Peer) - server := &Server{ - ListenAddr: "127.0.0.1:0", +// This test checks that connections are disconnected +// just after the encryption handshake when the server is +// at capacity. Trusted connections should still be accepted. +func TestServerAtCap(t *testing.T) { + trustedID := randomID() + srv := &Server{ PrivateKey: newkey(), - MaxPeers: 3, + MaxPeers: 10, NoDial: true, - TrustedNodes: []*discover.Node{trusted}, - newPeerHook: func(p *Peer) { started <- p }, + TrustedNodes: []*discover.Node{{ID: trustedID}}, } - if err := server.Start(); err != nil { - t.Fatal(err) + if err := srv.Start(); err != nil { + t.Fatalf("could not start: %v", err) } - defer server.Stop() + defer srv.Stop() - // Fill up all the slots on the server - dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)} - for i := 0; i < server.MaxPeers; i++ { - // Establish a new connection - conn, err := dialer.Dial("tcp", server.ListenAddr) - if err != nil { - t.Fatalf("conn %d: dial error: %v", i, err) - } - defer conn.Close() + newconn := func(id discover.NodeID) *conn { + fd, _ := net.Pipe() + tx := newTestTransport(id, fd) + return &conn{fd: fd, transport: tx, flags: inboundConn, id: id, cont: make(chan error)} + } - // Run the handshakes just like a real peer would, and wait for completion - key := newkey() - shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} - if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil { - t.Fatalf("conn %d: unexpected error: %v", i, err) + // Inject a few connections to fill up the peer set. + for i := 0; i < 10; i++ { + c := newconn(randomID()) + if err := srv.checkpoint(c, srv.addpeer); err != nil { + t.Fatalf("could not add conn %d: %v", i, err) } - <-started } - // Dial from the trusted peer, ensure connection is accepted - conn, err := dialer.Dial("tcp", server.ListenAddr) - if err != nil { - t.Fatalf("trusted node: dial error: %v", err) + // Try inserting a non-trusted connection. + c := newconn(randomID()) + if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { + t.Error("wrong error for insert:", err) } - defer conn.Close() - - shake := &protoHandshake{Version: baseProtocolVersion, ID: trusted.ID} - if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil { - t.Fatalf("trusted node: unexpected error: %v", err) + // Try inserting a trusted connection. + c = newconn(trustedID) + if err := srv.checkpoint(c, srv.posthandshake); err != nil { + t.Error("unexpected error for trusted conn @posthandshake:", err) } - select { - case <-started: - // Ok, trusted peer accepted - - case <-time.After(100 * time.Millisecond): - t.Fatalf("trusted node timeout") + if !c.is(trustedConn) { + t.Error("Server did not set trusted flag") } + } -// Tests that a failed dial will temporarily throttle a peer. -func TestServerMaxPendingDials(t *testing.T) { - // Start a simple test server - server := &Server{ - ListenAddr: "127.0.0.1:0", - PrivateKey: newkey(), - MaxPeers: 10, - MaxPendingPeers: 1, - } - if err := server.Start(); err != nil { - t.Fatal("failed to start test server: %v", err) +func TestServerSetupConn(t *testing.T) { + id := randomID() + srvkey := newkey() + srvid := discover.PubkeyID(&srvkey.PublicKey) + tests := []struct { + dontstart bool + tt *setupTransport + flags connFlag + dialDest *discover.Node + + wantCloseErr error + wantCalls string + }{ + { + dontstart: true, + tt: &setupTransport{id: id}, + wantCalls: "close,", + wantCloseErr: errServerStopped, + }, + { + tt: &setupTransport{id: id, encHandshakeErr: errors.New("read error")}, + flags: inboundConn, + wantCalls: "doEncHandshake,close,", + wantCloseErr: errors.New("read error"), + }, + { + tt: &setupTransport{id: id}, + dialDest: &discover.Node{ID: randomID()}, + flags: dynDialedConn, + wantCalls: "doEncHandshake,close,", + wantCloseErr: DiscUnexpectedIdentity, + }, + { + tt: &setupTransport{id: id, phs: &protoHandshake{ID: randomID()}}, + dialDest: &discover.Node{ID: id}, + flags: dynDialedConn, + wantCalls: "doEncHandshake,doProtoHandshake,close,", + wantCloseErr: DiscUnexpectedIdentity, + }, + { + tt: &setupTransport{id: id, protoHandshakeErr: errors.New("foo")}, + dialDest: &discover.Node{ID: id}, + flags: dynDialedConn, + wantCalls: "doEncHandshake,doProtoHandshake,close,", + wantCloseErr: errors.New("foo"), + }, + { + tt: &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}}, + flags: inboundConn, + wantCalls: "doEncHandshake,close,", + wantCloseErr: DiscSelf, + }, + { + tt: &setupTransport{id: id, phs: &protoHandshake{ID: id}}, + flags: inboundConn, + wantCalls: "doEncHandshake,doProtoHandshake,close,", + wantCloseErr: DiscUselessPeer, + }, } - defer server.Stop() - // Simulate two separate remote peers - peers := make(chan *discover.Node, 2) - conns := make(chan net.Conn, 2) - for i := 0; i < 2; i++ { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("listener %d: failed to setup: %v", i, err) - } - defer listener.Close() - - addr := listener.Addr().(*net.TCPAddr) - peers <- &discover.Node{ - ID: discover.PubkeyID(&newkey().PublicKey), - IP: addr.IP, - TCP: uint16(addr.Port), + for i, test := range tests { + srv := &Server{ + PrivateKey: srvkey, + MaxPeers: 10, + NoDial: true, + Protocols: []Protocol{discard}, + newTransport: func(fd net.Conn) transport { return test.tt }, } - go func() { - conn, err := listener.Accept() - if err == nil { - conns <- conn + if !test.dontstart { + if err := srv.Start(); err != nil { + t.Fatalf("couldn't start server: %v", err) } - }() - } - // Request a dial for both peers - go func() { - for i := 0; i < 2; i++ { - server.staticDial <- <-peers // hack piggybacking the static implementation } - }() - - // Make sure only one outbound connection goes through - var conn net.Conn - - select { - case conn = <-conns: - case <-time.After(100 * time.Millisecond): - t.Fatalf("first dial timeout") - } - select { - case conn = <-conns: - t.Fatalf("second dial completed prematurely") - case <-time.After(100 * time.Millisecond): - } - // Finish the first dial, check the second - conn.Close() - select { - case conn = <-conns: - conn.Close() - - case <-time.After(100 * time.Millisecond): - t.Fatalf("second dial timeout") + p1, _ := net.Pipe() + srv.setupConn(p1, test.flags, test.dialDest) + if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) { + t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr) + } + if test.tt.calls != test.wantCalls { + t.Errorf("test %d: calls mismatch: got %q, want %q", i, test.tt.calls, test.wantCalls) + } } } -func TestServerMaxPendingAccepts(t *testing.T) { - // Start a test server and a peer sink for synchronization - started := make(chan *Peer) - server := &Server{ - ListenAddr: "127.0.0.1:0", - PrivateKey: newkey(), - MaxPeers: 10, - MaxPendingPeers: 1, - NoDial: true, - newPeerHook: func(p *Peer) { started <- p }, - } - if err := server.Start(); err != nil { - t.Fatal("failed to start test server: %v", err) - } - defer server.Stop() +type setupTransport struct { + id discover.NodeID + encHandshakeErr error - // Try and connect to the server on multiple threads concurrently - conns := make([]net.Conn, 2) - for i := 0; i < 2; i++ { - dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)} + phs *protoHandshake + protoHandshakeErr error - conn, err := dialer.Dial("tcp", server.ListenAddr) - if err != nil { - t.Fatalf("failed to dial server: %v", err) - } - conns[i] = conn - } - // Check that a handshake on the second doesn't pass - go func() { - key := newkey() - shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} - if _, err := setupConn(conns[1], key, shake, server.Self(), keepalways); err != nil { - t.Fatalf("failed to run handshake: %v", err) - } - }() - select { - case <-started: - t.Fatalf("handshake on second connection accepted") + calls string + closeErr error +} - case <-time.After(time.Second): - } - // Shake on first, check that both go through - go func() { - key := newkey() - shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} - if _, err := setupConn(conns[0], key, shake, server.Self(), keepalways); err != nil { - t.Fatalf("failed to run handshake: %v", err) - } - }() - for i := 0; i < 2; i++ { - select { - case <-started: - case <-time.After(time.Second): - t.Fatalf("peer %d: handshake timeout", i) - } +func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *discover.Node) (discover.NodeID, error) { + c.calls += "doEncHandshake," + return c.id, c.encHandshakeErr +} +func (c *setupTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) { + c.calls += "doProtoHandshake," + if c.protoHandshakeErr != nil { + return nil, c.protoHandshakeErr } + return c.phs, nil +} +func (c *setupTransport) close(err error) { + c.calls += "close," + c.closeErr = err +} + +// setupConn shouldn't write to/read from the connection. +func (c *setupTransport) WriteMsg(Msg) error { + panic("WriteMsg called on setupTransport") +} +func (c *setupTransport) ReadMsg() (Msg, error) { + panic("ReadMsg called on setupTransport") } func newkey() *ecdsa.PrivateKey { @@ -459,7 +419,3 @@ func randomID() (id discover.NodeID) { } return id } - -func keepalways(id discover.NodeID) bool { - return true -} |