aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-01-31 19:40:39 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-12 12:19:09 +0800
commit452f0abfd598240cfe290438ac9bb3525d7ee83d (patch)
tree5993714f3c3bf3310f25a4f8d242174e617e05e8 /p2p
parent2be32e25545ff3bc322b1e5b55de4aa5d4d394a3 (diff)
downloaddexon-452f0abfd598240cfe290438ac9bb3525d7ee83d.tar.gz
dexon-452f0abfd598240cfe290438ac9bb3525d7ee83d.tar.zst
dexon-452f0abfd598240cfe290438ac9bb3525d7ee83d.zip
p2p, dex: rework connection management (#183)
* p2p, dex: rework connection management * dex: refresh our node record periodically * dex: don't send new record event if no new record
Diffstat (limited to 'p2p')
-rw-r--r--p2p/dial.go59
-rw-r--r--p2p/dial_test.go131
-rw-r--r--p2p/server.go146
-rw-r--r--p2p/server_test.go90
4 files changed, 26 insertions, 400 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index 909bed863..99acade36 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -64,12 +64,6 @@ func (t TCPDialer) Dial(dest *enode.Node) (net.Conn, error) {
return t.Dialer.Dial("tcp", addr.String())
}
-type dialGroup struct {
- name string
- nodes map[enode.ID]*enode.Node
- num uint64
-}
-
// dialstate schedules dials and discovery lookups.
// it get's a chance to compute new tasks on every iteration
// of the main loop in Server.run.
@@ -85,7 +79,6 @@ type dialstate struct {
randomNodes []*enode.Node // filled from Table
static map[enode.ID]*dialTask
direct map[enode.ID]*dialTask
- group map[string]*dialGroup
hist *dialHistory
start time.Time // time when the dialer was first used
@@ -143,7 +136,6 @@ func newDialState(self enode.ID, static []*enode.Node, bootnodes []*enode.Node,
netrestrict: netrestrict,
static: make(map[enode.ID]*dialTask),
direct: make(map[enode.ID]*dialTask),
- group: make(map[string]*dialGroup),
dialing: make(map[enode.ID]connFlag),
bootnodes: make([]*enode.Node, len(bootnodes)),
randomNodes: make([]*enode.Node, maxdyn/2),
@@ -179,14 +171,6 @@ func (s *dialstate) removeDirect(n *enode.Node) {
s.hist.remove(n.ID())
}
-func (s *dialstate) addGroup(g *dialGroup) {
- s.group[g.name] = g
-}
-
-func (s *dialstate) removeGroup(g *dialGroup) {
- delete(s.group, g.name)
-}
-
func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Time) []task {
if s.start.IsZero() {
s.start = now
@@ -244,49 +228,6 @@ func (s *dialstate) newTasks(nRunning int, peers map[enode.ID]*Peer, now time.Ti
}
}
- // compute connected
- connected := map[string]map[enode.ID]struct{}{}
- for _, g := range s.group {
- connected[g.name] = map[enode.ID]struct{}{}
- }
-
- for id := range peers {
- for _, g := range s.group {
- if _, ok := g.nodes[id]; ok {
- connected[g.name][id] = struct{}{}
- }
- }
- }
-
- for id := range s.dialing {
- for _, g := range s.group {
- if _, ok := g.nodes[id]; ok {
- connected[g.name][id] = struct{}{}
- }
- }
- }
-
- groupNodes := map[enode.ID]*enode.Node{}
- for _, g := range s.group {
- for _, n := range g.nodes {
- if uint64(len(connected[g.name])) >= g.num {
- break
- }
- err := s.checkDial(n, peers)
- switch err {
- case errNotWhitelisted, errSelf:
- log.Warn("Removing group dial candidate", "id", n.ID(), "addr", &net.TCPAddr{IP: n.IP(), Port: n.TCP()}, "err", err)
- delete(g.nodes, n.ID())
- case nil:
- groupNodes[n.ID()] = n
- connected[g.name][n.ID()] = struct{}{}
- }
- }
- }
- for _, n := range groupNodes {
- addDial(groupDialedConn, n)
- }
-
// If we don't have any peers whatsoever, try to dial a random bootnode. This
// scenario is useful for the testnet (and private networks) where the discovery
// table might be full of mostly bad peers, making it hard to find good ones.
diff --git a/p2p/dial_test.go b/p2p/dial_test.go
index 35e439798..ab687c2ea 100644
--- a/p2p/dial_test.go
+++ b/p2p/dial_test.go
@@ -611,137 +611,6 @@ func TestDialStateDirectDial(t *testing.T) {
})
}
-func TestDialStateGroupDial(t *testing.T) {
- groups := []*dialGroup{
- {
- name: "g1",
- nodes: map[enode.ID]*enode.Node{
- uintID(1): newNode(uintID(1), nil),
- uintID(2): newNode(uintID(2), nil),
- },
- num: 2,
- },
- {
- name: "g2",
- nodes: map[enode.ID]*enode.Node{
- uintID(2): newNode(uintID(2), nil),
- uintID(3): newNode(uintID(3), nil),
- uintID(4): newNode(uintID(4), nil),
- uintID(5): newNode(uintID(5), nil),
- uintID(6): newNode(uintID(6), nil),
- },
- num: 2,
- },
- }
-
- type groupTest struct {
- peers []*Peer
- dialing map[enode.ID]connFlag
- ceiling map[string]uint64
- }
-
- tests := []groupTest{
- {
- peers: nil,
- dialing: map[enode.ID]connFlag{},
- ceiling: map[string]uint64{"g1": 2, "g2": 4},
- },
- {
- peers: []*Peer{
- {rw: &conn{flags: staticDialedConn, node: newNode(uintID(2), nil)}},
- },
- dialing: map[enode.ID]connFlag{
- uintID(1): staticDialedConn,
- },
- ceiling: map[string]uint64{"g1": 2, "g2": 2},
- },
- {
- peers: []*Peer{
- {rw: &conn{flags: staticDialedConn, node: newNode(uintID(1), nil)}},
- {rw: &conn{flags: staticDialedConn, node: newNode(uintID(3), nil)}},
- {rw: &conn{flags: staticDialedConn, node: newNode(uintID(4), nil)}},
- {rw: &conn{flags: staticDialedConn, node: newNode(uintID(5), nil)}},
- },
- dialing: map[enode.ID]connFlag{
- uintID(2): staticDialedConn,
- },
- ceiling: map[string]uint64{"g1": 2, "g2": 4},
- },
- {
- peers: nil,
- dialing: map[enode.ID]connFlag{
- uintID(1): staticDialedConn,
- uintID(2): staticDialedConn,
- uintID(3): staticDialedConn,
- },
- ceiling: map[string]uint64{"g1": 2, "g2": 4},
- },
- }
-
- pm := func(ps []*Peer) map[enode.ID]*Peer {
- m := make(map[enode.ID]*Peer)
- for _, p := range ps {
- m[p.rw.node.ID()] = p
- }
- return m
- }
-
- run := func(i int, tt groupTest) {
- d := newDialState(enode.ID{}, nil, nil, fakeTable{}, 0, nil)
- d.dialing = make(map[enode.ID]connFlag)
- for k, v := range tt.dialing {
- d.dialing[k] = v
- }
-
- for _, g := range groups {
- d.addGroup(g)
- }
- peermap := pm(tt.peers)
- new := d.newTasks(len(tt.dialing), peermap, time.Now())
-
- cnt := map[string]uint64{}
- for id := range peermap {
- for _, g := range groups {
- if _, ok := g.nodes[id]; ok {
- cnt[g.name]++
- }
- }
- }
-
- for id := range tt.dialing {
- for _, g := range groups {
- if _, ok := g.nodes[id]; ok {
- cnt[g.name]++
- }
- }
- }
-
- for _, task := range new {
- id := task.(*dialTask).dest.ID()
- for _, g := range groups {
- if _, ok := g.nodes[id]; ok {
- cnt[g.name]++
- }
- }
- }
-
- for _, g := range groups {
- if cnt[g.name] < g.num {
- t.Errorf("test %d) group %s peers + dialing + new < num (%d < %d)",
- i, g.name, cnt[g.name], g.num)
- }
- if cnt[g.name] > tt.ceiling[g.name] {
- t.Errorf("test %d) group %s peers + dialing + new > ceiling (%d > %d)",
- i, g.name, cnt[g.name], tt.ceiling[g.name])
- }
- }
- }
-
- for i, tt := range tests {
- run(i, tt)
- }
-}
-
// This test checks that static peers will be redialed immediately if they were re-added to a static list.
func TestDialStaticAfterReset(t *testing.T) {
wantStatic := []*enode.Node{
diff --git a/p2p/server.go b/p2p/server.go
index 36b1721a5..58b76a708 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -182,8 +182,6 @@ type Server struct {
removedirect chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
- addgroup chan *dialGroup
- removegroup chan *dialGroup
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -206,7 +204,6 @@ const (
dynDialedConn connFlag = 1 << iota
staticDialedConn
directDialedConn
- groupDialedConn
inboundConn
trustedConn
)
@@ -260,9 +257,6 @@ func (f connFlag) String() string {
if f&directDialedConn != 0 {
s += "-directdial"
}
- if f&groupDialedConn != 0 {
- s += "-groupdial"
- }
if f&inboundConn != 0 {
s += "-inbound"
}
@@ -357,26 +351,6 @@ func (srv *Server) RemoveDirectPeer(node *enode.Node) {
}
}
-func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) {
- m := map[enode.ID]*enode.Node{}
- for _, node := range nodes {
- m[node.ID()] = node
- }
- g := &dialGroup{name: name, nodes: m, num: num}
- select {
- case srv.addgroup <- g:
- case <-srv.quit:
- }
-}
-
-func (srv *Server) RemoveGroup(name string) {
- g := &dialGroup{name: name}
- select {
- case srv.removegroup <- g:
- case <-srv.quit:
- }
-}
-
// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *enode.Node) {
@@ -524,8 +498,6 @@ func (srv *Server) Start() (err error) {
srv.removestatic = make(chan *enode.Node)
srv.adddirect = make(chan *enode.Node)
srv.removedirect = make(chan *enode.Node)
- srv.addgroup = make(chan *dialGroup)
- srv.removegroup = make(chan *dialGroup)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
@@ -689,8 +661,6 @@ type dialer interface {
removeStatic(*enode.Node)
addDirect(*enode.Node)
removeDirect(*enode.Node)
- addGroup(*dialGroup)
- removeGroup(*dialGroup)
}
func (srv *Server) run(dialstate dialer) {
@@ -699,15 +669,12 @@ func (srv *Server) run(dialstate dialer) {
defer srv.nodedb.Close()
var (
- peers = make(map[enode.ID]*Peer)
- inboundCount = 0
- trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
- peerflags = make(map[enode.ID]connFlag)
- groupRefCount = make(map[enode.ID]int32)
- groups = make(map[string]*dialGroup)
- taskdone = make(chan task, maxActiveDialTasks)
- runningTasks []task
- queuedTasks []task // tasks that can't run yet
+ peers = make(map[enode.ID]*Peer)
+ inboundCount = 0
+ trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ taskdone = make(chan task, maxActiveDialTasks)
+ runningTasks []task
+ queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
@@ -745,60 +712,6 @@ func (srv *Server) run(dialstate dialer) {
}
}
- // remember and maintain the connection flags locally
- setConnFlags := func(id enode.ID, f connFlag, val bool) {
- if p, ok := peers[id]; ok {
- p.rw.set(f, val)
- }
- if val {
- peerflags[id] |= f
- } else {
- peerflags[id] &= ^f
- }
- if peerflags[id] == 0 {
- delete(peerflags, id)
- }
- }
-
- // Put trusted nodes into a map to speed up checks.
- // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
- for _, n := range srv.TrustedNodes {
- setConnFlags(n.ID(), trustedConn, true)
- }
-
- canDisconnect := func(p *Peer) bool {
- f, ok := peerflags[p.ID()]
- if ok && f != 0 {
- return false
- }
- return !p.rw.is(dynDialedConn | inboundConn)
- }
-
- removeGroup := func(g *dialGroup) {
- if gg, ok := groups[g.name]; ok {
- for id := range gg.nodes {
- groupRefCount[id]--
- if groupRefCount[id] == 0 {
- setConnFlags(id, groupDialedConn, false)
- delete(groupRefCount, id)
- }
- }
- }
- }
-
- addGroup := func(g *dialGroup) {
- if _, ok := groups[g.name]; ok {
- removeGroup(groups[g.name])
- }
- for id := range g.nodes {
- groupRefCount[id]++
- if groupRefCount[id] > 0 {
- setConnFlags(id, groupDialedConn, true)
- }
- }
- groups[g.name] = g
- }
-
running:
for {
scheduleTasks()
@@ -812,16 +725,14 @@ running:
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
- setConnFlags(n.ID(), staticDialedConn, true)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
- setConnFlags(n.ID(), staticDialedConn, false)
dialstate.removeStatic(n)
- if p, ok := peers[n.ID()]; ok && canDisconnect(p) {
+ if p, ok := peers[n.ID()]; ok {
p.Disconnect(DiscRequested)
}
case n := <-srv.adddirect:
@@ -829,42 +740,36 @@ running:
// ephemeral direct peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding direct node", "node", n)
- setConnFlags(n.ID(), directDialedConn, true)
- if p, ok := peers[n.ID()]; ok {
- p.rw.set(directDialedConn, true)
- }
dialstate.addDirect(n)
case n := <-srv.removedirect:
// This channel is used by RemoveDirectPeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing direct node", "node", n)
- setConnFlags(n.ID(), directDialedConn, false)
+ dialstate.removeDirect(n)
if p, ok := peers[n.ID()]; ok {
- p.rw.set(directDialedConn, false)
- if !p.rw.is(trustedConn | groupDialedConn) {
- p.Disconnect(DiscRequested)
- }
+ p.Disconnect(DiscRequested)
}
- dialstate.removeDirect(n)
- case g := <-srv.addgroup:
- srv.log.Trace("Adding group", "group", g)
- addGroup(g)
- dialstate.addGroup(g)
- case g := <-srv.removegroup:
- srv.log.Trace("Removing group", "group", g)
- removeGroup(g)
- dialstate.removeGroup(g)
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
- setConnFlags(n.ID(), trustedConn, true)
+ trusted[n.ID()] = true
+ // Mark any already-connected peer as trusted
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(trustedConn, true)
+ }
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
- setConnFlags(n.ID(), trustedConn, false)
+ if _, ok := trusted[n.ID()]; ok {
+ delete(trusted, n.ID())
+ }
+ // Unmark any already-connected peer as trusted
+ if p, ok := peers[n.ID()]; ok {
+ p.rw.set(trustedConn, false)
+ }
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -879,8 +784,9 @@ running:
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
- if f, ok := peerflags[c.node.ID()]; ok {
- c.flags |= f
+ if trusted[c.node.ID()] {
+ // Ensure that the trusted flag is set before checking against MaxPeers.
+ c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
@@ -962,9 +868,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|staticDialedConn|directDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|directDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 8bd113791..734b2a8c1 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -203,92 +203,6 @@ func TestServerDial(t *testing.T) {
}
}
-func TestServerPeerConnFlag(t *testing.T) {
- srv := &Server{
- Config: Config{
- PrivateKey: newkey(),
- MaxPeers: 10,
- NoDial: true,
- },
- }
- if err := srv.Start(); err != nil {
- t.Fatalf("could not start: %v", err)
- }
- defer srv.Stop()
-
- // inject a peer
- key := newkey()
- id := enode.PubkeyToIDV4(&key.PublicKey)
- node := newNode(id, nil)
- fd, _ := net.Pipe()
- c := &conn{
- node: node,
- fd: fd,
- transport: newTestTransport(&key.PublicKey, fd),
- flags: inboundConn,
- cont: make(chan error),
- }
- if err := srv.checkpoint(c, srv.addpeer); err != nil {
- t.Fatalf("could not add conn: %v", err)
- }
-
- srv.AddTrustedPeer(node)
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | trustedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | trustedConn))
- }
-
- srv.AddDirectPeer(node)
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | trustedConn | directDialedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | trustedConn | directDialedConn))
- }
-
- srv.AddGroup("g1", []*enode.Node{node}, 1)
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn))
- }
-
- srv.AddGroup("g2", []*enode.Node{node}, 1)
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | trustedConn | directDialedConn | groupDialedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | trustedConn | directDialedConn | groupDialedConn))
- }
-
- srv.RemoveTrustedPeer(node)
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | directDialedConn | groupDialedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | directDialedConn | directDialedConn))
- }
-
- srv.RemoveDirectPeer(node)
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | groupDialedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | directDialedConn))
- }
-
- srv.RemoveGroup("g1")
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != (inboundConn | groupDialedConn) {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, (inboundConn | directDialedConn))
- }
-
- srv.RemoveGroup("g2")
- srv.Peers() // leverage this function to ensure trusted peer is added
- if c.flags != inboundConn {
- t.Errorf("flags mismatch: got %d, want %d",
- c.flags, inboundConn)
- }
-}
-
// This test checks that tasks generated by dialstate are
// actually executed and taskdone is called for them.
func TestServerTaskScheduling(t *testing.T) {
@@ -429,10 +343,6 @@ func (tg taskgen) addDirect(*enode.Node) {
}
func (tg taskgen) removeDirect(*enode.Node) {
}
-func (tg taskgen) addGroup(*dialGroup) {
-}
-func (tg taskgen) removeGroup(*dialGroup) {
-}
type testTask struct {
index int