diff options
author | Sonic <sonic@dexon.org> | 2019-01-31 19:40:39 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:57 +0800 |
commit | 07bb51fb817b0e8db9453b185a3684c95a494342 (patch) | |
tree | 0b70dd8c1f190bdee7c778a971fb46297766fe5f /p2p | |
parent | 2e939de0678b7ef8da6a0306270e0ef126a8df01 (diff) | |
download | dexon-07bb51fb817b0e8db9453b185a3684c95a494342.tar.gz dexon-07bb51fb817b0e8db9453b185a3684c95a494342.tar.zst dexon-07bb51fb817b0e8db9453b185a3684c95a494342.zip |
p2p, dex: rework connection management (#183)
* p2p, dex: rework connection management
* dex: refresh our node record periodically
* dex: don't send new record event if no new record
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/dial.go | 59 | ||||
-rw-r--r-- | p2p/dial_test.go | 131 | ||||
-rw-r--r-- | p2p/server.go | 146 | ||||
-rw-r--r-- | p2p/server_test.go | 90 |
4 files changed, 26 insertions, 400 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index 909bed863..99acade36 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -64,12 +64,6 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) { return t.Dialer.Dial("tcp", addr.String()) } -type dialGroup struct { - name string - nodes map[enode.ID]*enode.Node - num uint64 -} - // dialstate schedules dials and discovery lookups. // it get's a chance to compute new tasks on every iteration // of the main loop in Server.run. @@ -85,7 +79,6 @@ type dialstate struct { randomNodes []*enode.Node // filled from Table static map[enode.ID]*dialTask direct map[enode.ID]*dialTask - group map[string]*dialGroup hist *dialHistory start time.Time // time when the dialer was first used @@ -143,7 +136,6 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node, netrestrict: netrestrict, static: make(map[enode.ID]*dialTask), direct: make(map[enode.ID]*dialTask), - group: make(map[string]*dialGroup), dialing: make(map[enode.ID]connFlag), bootnodes: make([]*enode.Node, len(bootnodes)), randomNodes: make([]*enode.Node, maxdyn/2), @@ -179,14 +171,6 @@ func (s *dialstate) removeDirect(n *enode.Node) { s.hist.remove(n.ID()) } -func (s *dialstate) addGroup(g *dialGroup) { - s.group[g.name] = g -} - -func (s *dialstate) removeGroup(g *dialGroup) { - delete(s.group, g.name) -} - func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task { if s.start.IsZero() { s.start = now @@ -244,49 +228,6 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti } } - // compute connected - connected := map[string]map[enode.ID]struct{}{} - for _, g := range s.group { - connected[g.name] = map[enode.ID]struct{}{} - } - - for id := range peers { - for _, g := range s.group { - if _, ok := g.nodes[id]; ok { - connected[g.name][id] = struct{}{} - } - } - } - - for id := range s.dialing { - for _, g := range s.group { - if _, ok := g.nodes[id]; ok { - connected[g.name][id] = struct{}{} - } - } - } - - groupNodes := map[enode.ID]*enode.Node{} - for _, g := range s.group { - for _, n := range g.nodes { - if uint64(len(connected[g.name])) >= g.num { - break - } - err := s.checkDial(n, peers) - switch err { - case errNotWhitelisted, errSelf: - log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err) - delete(g.nodes, n.ID()) - case nil: - groupNodes[n.ID()] = n - connected[g.name][n.ID()] = struct{}{} - } - } - } - for _, n := range groupNodes { - addDial(groupDialedConn, n) - } - // If we don't have any peers whatsoever, try to dial a random bootnode. This // scenario is useful for the testnet (and private networks) where the discovery // table might be full of mostly bad peers, making it hard to find good ones. diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 35e439798..ab687c2ea 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -611,137 +611,6 @@ func TestDialStateDirectDial(t *testing.T) { }) } -func TestDialStateGroupDial(t *testing.T) { - groups := []*dialGroup{ - { - name: "g1", - nodes: map[enode.ID]*enode.Node{ - uintID(1): newNode(uintID(1), nil), - uintID(2): newNode(uintID(2), nil), - }, - num: 2, - }, - { - name: "g2", - nodes: map[enode.ID]*enode.Node{ - uintID(2): newNode(uintID(2), nil), - uintID(3): newNode(uintID(3), nil), - uintID(4): newNode(uintID(4), nil), - uintID(5): newNode(uintID(5), nil), - uintID(6): newNode(uintID(6), nil), - }, - num: 2, - }, - } - - type groupTest struct { - peers []*Peer - dialing map[enode.ID]connFlag - ceiling map[string]uint64 - } - - tests := []groupTest{ - { - peers: nil, - dialing: map[enode.ID]connFlag{}, - ceiling: map[string]uint64{"g1": 2, "g2": 4}, - }, - { - peers: []*Peer{ - {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}}, - }, - dialing: map[enode.ID]connFlag{ - uintID(1): staticDialedConn, - }, - ceiling: map[string]uint64{"g1": 2, "g2": 2}, - }, - { - peers: []*Peer{ - {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}}, - {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}}, - {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}}, - {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}}, - }, - dialing: map[enode.ID]connFlag{ - uintID(2): staticDialedConn, - }, - ceiling: map[string]uint64{"g1": 2, "g2": 4}, - }, - { - peers: nil, - dialing: map[enode.ID]connFlag{ - uintID(1): staticDialedConn, - uintID(2): staticDialedConn, - uintID(3): staticDialedConn, - }, - ceiling: map[string]uint64{"g1": 2, "g2": 4}, - }, - } - - pm := func(ps []*Peer) map[enode.ID]*Peer { - m := make(map[enode.ID]*Peer) - for _, p := range ps { - m[p.rw.node.ID()] = p - } - return m - } - - run := func(i int, tt groupTest) { - d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil) - d.dialing = make(map[enode.ID]connFlag) - for k, v := range tt.dialing { - d.dialing[k] = v - } - - for _, g := range groups { - d.addGroup(g) - } - peermap := pm(tt.peers) - new := d.newTasks(len(tt.dialing), peermap, time.Now()) - - cnt := map[string]uint64{} - for id := range peermap { - for _, g := range groups { - if _, ok := g.nodes[id]; ok { - cnt[g.name]++ - } - } - } - - for id := range tt.dialing { - for _, g := range groups { - if _, ok := g.nodes[id]; ok { - cnt[g.name]++ - } - } - } - - for _, task := range new { - id := task.(*dialTask).dest.ID() - for _, g := range groups { - if _, ok := g.nodes[id]; ok { - cnt[g.name]++ - } - } - } - - for _, g := range groups { - if cnt[g.name] < g.num { - t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)", - i, g.name, cnt[g.name], g.num) - } - if cnt[g.name] > tt.ceiling[g.name] { - t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)", - i, g.name, cnt[g.name], tt.ceiling[g.name]) - } - } - } - - for i, tt := range tests { - run(i, tt) - } -} - // This test checks that static peers will be redialed immediately if they were re-added to a static list. func TestDialStaticAfterReset(t *testing.T) { wantStatic := []*enode.Node{ diff --git a/p2p/server.go b/p2p/server.go index 36b1721a5..58b76a708 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -182,8 +182,6 @@ type Server struct { removedirect chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node - addgroup chan *dialGroup - removegroup chan *dialGroup posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -206,7 +204,6 @@ const ( dynDialedConn connFlag = 1 << iota staticDialedConn directDialedConn - groupDialedConn inboundConn trustedConn ) @@ -260,9 +257,6 @@ func (f connFlag) String() string { if f&directDialedConn != 0 { s += "-directdial" } - if f&groupDialedConn != 0 { - s += "-groupdial" - } if f&inboundConn != 0 { s += "-inbound" } @@ -357,26 +351,6 @@ func (srv *Server) RemoveDirectPeer(node *enode.Node) { } } -func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) { - m := map[enode.ID]*enode.Node{} - for _, node := range nodes { - m[node.ID()] = node - } - g := &dialGroup{name: name, nodes: m, num: num} - select { - case srv.addgroup <- g: - case <-srv.quit: - } -} - -func (srv *Server) RemoveGroup(name string) { - g := &dialGroup{name: name} - select { - case srv.removegroup <- g: - case <-srv.quit: - } -} - // AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. func (srv *Server) AddTrustedPeer(node *enode.Node) { @@ -524,8 +498,6 @@ func (srv *Server) Start() (err error) { srv.removestatic = make(chan *enode.Node) srv.adddirect = make(chan *enode.Node) srv.removedirect = make(chan *enode.Node) - srv.addgroup = make(chan *dialGroup) - srv.removegroup = make(chan *dialGroup) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) @@ -689,8 +661,6 @@ type dialer interface { removeStatic(*enode.Node) addDirect(*enode.Node) removeDirect(*enode.Node) - addGroup(*dialGroup) - removeGroup(*dialGroup) } func (srv *Server) run(dialstate dialer) { @@ -699,15 +669,12 @@ func (srv *Server) run(dialstate dialer) { defer srv.nodedb.Close() var ( - peers = make(map[enode.ID]*Peer) - inboundCount = 0 - trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) - peerflags = make(map[enode.ID]connFlag) - groupRefCount = make(map[enode.ID]int32) - groups = make(map[string]*dialGroup) - taskdone = make(chan task, maxActiveDialTasks) - runningTasks []task - queuedTasks []task // tasks that can't run yet + peers = make(map[enode.ID]*Peer) + inboundCount = 0 + trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. @@ -745,60 +712,6 @@ func (srv *Server) run(dialstate dialer) { } } - // remember and maintain the connection flags locally - setConnFlags := func(id enode.ID, f connFlag, val bool) { - if p, ok := peers[id]; ok { - p.rw.set(f, val) - } - if val { - peerflags[id] |= f - } else { - peerflags[id] &= ^f - } - if peerflags[id] == 0 { - delete(peerflags, id) - } - } - - // Put trusted nodes into a map to speed up checks. - // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. - for _, n := range srv.TrustedNodes { - setConnFlags(n.ID(), trustedConn, true) - } - - canDisconnect := func(p *Peer) bool { - f, ok := peerflags[p.ID()] - if ok && f != 0 { - return false - } - return !p.rw.is(dynDialedConn | inboundConn) - } - - removeGroup := func(g *dialGroup) { - if gg, ok := groups[g.name]; ok { - for id := range gg.nodes { - groupRefCount[id]-- - if groupRefCount[id] == 0 { - setConnFlags(id, groupDialedConn, false) - delete(groupRefCount, id) - } - } - } - } - - addGroup := func(g *dialGroup) { - if _, ok := groups[g.name]; ok { - removeGroup(groups[g.name]) - } - for id := range g.nodes { - groupRefCount[id]++ - if groupRefCount[id] > 0 { - setConnFlags(id, groupDialedConn, true) - } - } - groups[g.name] = g - } - running: for { scheduleTasks() @@ -812,16 +725,14 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding static node", "node", n) - setConnFlags(n.ID(), staticDialedConn, true) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) - setConnFlags(n.ID(), staticDialedConn, false) dialstate.removeStatic(n) - if p, ok := peers[n.ID()]; ok && canDisconnect(p) { + if p, ok := peers[n.ID()]; ok { p.Disconnect(DiscRequested) } case n := <-srv.adddirect: @@ -829,42 +740,36 @@ running: // ephemeral direct peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding direct node", "node", n) - setConnFlags(n.ID(), directDialedConn, true) - if p, ok := peers[n.ID()]; ok { - p.rw.set(directDialedConn, true) - } dialstate.addDirect(n) case n := <-srv.removedirect: // This channel is used by RemoveDirectPeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing direct node", "node", n) - setConnFlags(n.ID(), directDialedConn, false) + dialstate.removeDirect(n) if p, ok := peers[n.ID()]; ok { - p.rw.set(directDialedConn, false) - if !p.rw.is(trustedConn | groupDialedConn) { - p.Disconnect(DiscRequested) - } + p.Disconnect(DiscRequested) } - dialstate.removeDirect(n) - case g := <-srv.addgroup: - srv.log.Trace("Adding group", "group", g) - addGroup(g) - dialstate.addGroup(g) - case g := <-srv.removegroup: - srv.log.Trace("Removing group", "group", g) - removeGroup(g) - dialstate.removeGroup(g) case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add an enode // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) - setConnFlags(n.ID(), trustedConn, true) + trusted[n.ID()] = true + // Mark any already-connected peer as trusted + if p, ok := peers[n.ID()]; ok { + p.rw.set(trustedConn, true) + } case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - setConnFlags(n.ID(), trustedConn, false) + if _, ok := trusted[n.ID()]; ok { + delete(trusted, n.ID()) + } + // Unmark any already-connected peer as trusted + if p, ok := peers[n.ID()]; ok { + p.rw.set(trustedConn, false) + } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -879,8 +784,9 @@ running: case c := <-srv.posthandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). - if f, ok := peerflags[c.node.ID()]; ok { - c.flags |= f + if trusted[c.node.ID()] { + // Ensure that the trusted flag is set before checking against MaxPeers. + c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { @@ -962,9 +868,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|staticDialedConn|directDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|directDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected diff --git a/p2p/server_test.go b/p2p/server_test.go index 8bd113791..734b2a8c1 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -203,92 +203,6 @@ func TestServerDial(t *testing.T) { } } -func TestServerPeerConnFlag(t *testing.T) { - srv := &Server{ - Config: Config{ - PrivateKey: newkey(), - MaxPeers: 10, - NoDial: true, - }, - } - if err := srv.Start(); err != nil { - t.Fatalf("could not start: %v", err) - } - defer srv.Stop() - - // inject a peer - key := newkey() - id := enode.PubkeyToIDV4(&key.PublicKey) - node := newNode(id, nil) - fd, _ := net.Pipe() - c := &conn{ - node: node, - fd: fd, - transport: newTestTransport(&key.PublicKey, fd), - flags: inboundConn, - cont: make(chan error), - } - if err := srv.checkpoint(c, srv.addpeer); err != nil { - t.Fatalf("could not add conn: %v", err) - } - - srv.AddTrustedPeer(node) - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | trustedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | trustedConn)) - } - - srv.AddDirectPeer(node) - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | trustedConn | directDialedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | trustedConn | directDialedConn)) - } - - srv.AddGroup("g1", []*enode.Node{node}, 1) - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) - } - - srv.AddGroup("g2", []*enode.Node{node}, 1) - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn)) - } - - srv.RemoveTrustedPeer(node) - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | directDialedConn | groupDialedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | directDialedConn | directDialedConn)) - } - - srv.RemoveDirectPeer(node) - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | groupDialedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | directDialedConn)) - } - - srv.RemoveGroup("g1") - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != (inboundConn | groupDialedConn) { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, (inboundConn | directDialedConn)) - } - - srv.RemoveGroup("g2") - srv.Peers() // leverage this function to ensure trusted peer is added - if c.flags != inboundConn { - t.Errorf("flags mismatch: got %d, want %d", - c.flags, inboundConn) - } -} - // This test checks that tasks generated by dialstate are // actually executed and taskdone is called for them. func TestServerTaskScheduling(t *testing.T) { @@ -429,10 +343,6 @@ func (tg taskgen) addDirect(*enode.Node) { } func (tg taskgen) removeDirect(*enode.Node) { } -func (tg taskgen) addGroup(*dialGroup) { -} -func (tg taskgen) removeGroup(*dialGroup) { -} type testTask struct { index int |