diff options
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 271 |
1 files changed, 188 insertions, 83 deletions
diff --git a/p2p/server.go b/p2p/server.go index c3b3a00d4..a58673342 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -33,13 +33,13 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/discv5" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/p2p/nat" "github.com/dexon-foundation/dexon/p2p/netutil" "github.com/dexon-foundation/dexon/rlp" + "github.com/ethereum/go-ethereum/p2p/discover" ) const ( @@ -178,10 +178,12 @@ type Server struct { quit chan struct{} addstatic chan *enode.Node removestatic chan *enode.Node + adddirect chan *enode.Node + removedirect chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node - addnotary chan *enode.Node - removenotary chan *enode.Node + addgroup chan *dialGroup + removegroup chan *dialGroup posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -203,9 +205,10 @@ type connFlag int32 const ( dynDialedConn connFlag = 1 << iota staticDialedConn + directDialedConn + groupDialedConn inboundConn trustedConn - notaryConn ) // conn wraps a network connection with information gathered @@ -254,12 +257,15 @@ func (f connFlag) String() string { if f&staticDialedConn != 0 { s += "-staticdial" } + if f&directDialedConn != 0 { + s += "-directdial" + } + if f&groupDialedConn != 0 { + s += "-groupdial" + } if f&inboundConn != 0 { s += "-inbound" } - if f¬aryConn != 0 { - s += "-notary" - } if s != "" { s = s[1:] } @@ -333,40 +339,57 @@ func (srv *Server) RemovePeer(node *enode.Node) { } } -// AddTrustedPeer adds the given node to a reserved whitelist which allows the -// node to always connect, even if the slot are full. -func (srv *Server) AddTrustedPeer(node *enode.Node) { +// AddDirectPeer connects to the given node and maintains the connection until the +// server is shut down. If the connection fails for any reason, the server will +// attempt to reconnect the peer. +func (srv *Server) AddDirectPeer(node *enode.Node) { select { - case srv.addtrusted <- node: + case srv.adddirect <- node: case <-srv.quit: } } -// RemoveTrustedPeer removes the given node from the trusted peer set. -func (srv *Server) RemoveTrustedPeer(node *enode.Node) { +// RemoveDirectPeer disconnects from the given node +func (srv *Server) RemoveDirectPeer(node *enode.Node) { select { - case srv.removetrusted <- node: + case srv.removedirect <- node: case <-srv.quit: } } -// AddNotaryPeer connects to the given node and maintains the connection until the -// server is shut down. If the connection fails for any reason, the server will -// attempt to reconnect the peer. -// AddNotaryPeer also adds the given node to a reserved whitelist which allows the +func (srv *Server) AddGroup(name string, nodes []*enode.Node, num uint64) { + m := map[enode.ID]*enode.Node{} + for _, node := range nodes { + m[node.ID()] = node + } + g := &dialGroup{name: name, nodes: m, num: num} + select { + case srv.addgroup <- g: + case <-srv.quit: + } +} + +func (srv *Server) RemoveGroup(name string) { + g := &dialGroup{name: name} + select { + case srv.removegroup <- g: + case <-srv.quit: + } +} + +// AddTrustedPeer adds the given node to a reserved whitelist which allows the // node to always connect, even if the slot are full. -func (srv *Server) AddNotaryPeer(node *discover.Node) { +func (srv *Server) AddTrustedPeer(node *enode.Node) { select { - case srv.addnotary <- node: + case srv.addtrusted <- node: case <-srv.quit: } } -// RemoveNotaryPeer disconnects from the given node. -// RemoveNotaryPeer also removes the given node from the notary peer set. -func (srv *Server) RemoveNotaryPeer(node *discover.Node) { +// RemoveTrustedPeer removes the given node from the trusted peer set. +func (srv *Server) RemoveTrustedPeer(node *enode.Node) { select { - case srv.removenotary <- node: + case srv.removetrusted <- node: case <-srv.quit: } } @@ -388,6 +411,36 @@ func (srv *Server) Self() *enode.Node { return ln.Node() } +func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node { + // If the node is running but discovery is off, manually assemble the node infos. + if ntab == nil { + addr := srv.tcpAddr(listener) + return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0) + } + // Otherwise return the discovery node. + return ntab.Self() +} + +func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr { + addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}} + if listener == nil { + return addr // Inbound connections disabled, use zero address. + } + // Otherwise inject the listener address too. + if a, ok := listener.Addr().(*net.TCPAddr); ok { + addr = *a + } + if srv.NAT != nil { + if ip, err := srv.NAT.ExternalIP(); err == nil { + addr.IP = ip + } + } + if addr.IP.IsUnspecified() { + addr.IP = net.IP{127, 0, 0, 1} + } + return addr +} + // Stop terminates the server and all active peer connections. // It blocks until all active connections have been closed. func (srv *Server) Stop() { @@ -465,10 +518,12 @@ func (srv *Server) Start() (err error) { srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *enode.Node) srv.removestatic = make(chan *enode.Node) + srv.adddirect = make(chan *enode.Node) + srv.removedirect = make(chan *enode.Node) + srv.addgroup = make(chan *dialGroup) + srv.removegroup = make(chan *dialGroup) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) - srv.addnotary = make(chan *enode.Node) - srv.removenotary = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -628,6 +683,10 @@ type dialer interface { taskDone(task, time.Time) addStatic(*enode.Node) removeStatic(*enode.Node) + addDirect(*enode.Node) + removeDirect(*enode.Node) + addGroup(*dialGroup) + removeGroup(*dialGroup) } func (srv *Server) run(dialstate dialer) { @@ -636,13 +695,15 @@ func (srv *Server) run(dialstate dialer) { defer srv.nodedb.Close() var ( - peers = make(map[enode.ID]*Peer) - inboundCount = 0 - trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) - notary = make(map[enode.ID]bool) - taskdone = make(chan task, maxActiveDialTasks) - runningTasks []task - queuedTasks []task // tasks that can't run yet + peers = make(map[enode.ID]*Peer) + inboundCount = 0 + trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + peerflags = make(map[enode.ID]connFlag) + groupRefCount = make(map[enode.ID]int32) + groups = make(map[string]*dialGroup) + taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. @@ -680,6 +741,60 @@ func (srv *Server) run(dialstate dialer) { } } + // remember and maintain the connection flags locally + setConnFlags := func(id enode.ID, f connFlag, val bool) { + if p, ok := peers[id]; ok { + p.rw.set(f, val) + } + if val { + peerflags[id] |= f + } else { + peerflags[id] &= ^f + } + if peerflags[id] == 0 { + delete(peerflags, id) + } + } + + // Put trusted nodes into a map to speed up checks. + // Trusted peers are loaded on startup or added via AddTrustedPeer RPC. + for _, n := range srv.TrustedNodes { + setConnFlags(n.ID(), trustedConn, true) + } + + canDisconnect := func(p *Peer) bool { + f, ok := peerflags[p.ID()] + if ok && f != 0 { + return false + } + return !p.rw.is(dynDialedConn | inboundConn) + } + + removeGroup := func(g *dialGroup) { + if gg, ok := groups[g.name]; ok { + for id := range gg.nodes { + groupRefCount[id]-- + if groupRefCount[id] == 0 { + setConnFlags(id, groupDialedConn, false) + delete(groupRefCount, id) + } + } + } + } + + addGroup := func(g *dialGroup) { + if _, ok := groups[g.name]; ok { + removeGroup(groups[g.name]) + } + for id := range g.nodes { + groupRefCount[id]++ + if groupRefCount[id] > 0 { + setConnFlags(id, groupDialedConn, true) + } + } + groups[g.name] = g + } + running: for { scheduleTasks() @@ -693,63 +808,59 @@ running: // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. srv.log.Trace("Adding static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, true) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected. srv.log.Trace("Removing static node", "node", n) + setConnFlags(n.ID(), staticDialedConn, false) dialstate.removeStatic(n) - if p, ok := peers[n.ID()]; ok { + if p, ok := peers[n.ID()]; ok && canDisconnect(p) { p.Disconnect(DiscRequested) } + case n := <-srv.adddirect: + // This channel is used by AddDirectPeer to add to the + // ephemeral direct peer list. Add it to the dialer, + // it will keep the node connected. + srv.log.Trace("Adding direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, true) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, true) + } + dialstate.addDirect(n) + case n := <-srv.removedirect: + // This channel is used by RemoveDirectPeer to send a + // disconnect request to a peer and begin the + // stop keeping the node connected. + srv.log.Trace("Removing direct node", "node", n) + setConnFlags(n.ID(), directDialedConn, false) + if p, ok := peers[n.ID()]; ok { + p.rw.set(directDialedConn, false) + if !p.rw.is(trustedConn | groupDialedConn) { + p.Disconnect(DiscRequested) + } + } + dialstate.removeDirect(n) + case g := <-srv.addgroup: + srv.log.Trace("Adding group", "group", g) + addGroup(g) + dialstate.addGroup(g) + case g := <-srv.removegroup: + srv.log.Trace("Removing group", "group", g) + removeGroup(g) + dialstate.removeGroup(g) case n := <-srv.addtrusted: // This channel is used by AddTrustedPeer to add an enode // to the trusted node set. srv.log.Trace("Adding trusted node", "node", n) - trusted[n.ID()] = true - // Mark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, true) - } + setConnFlags(n.ID(), trustedConn, true) case n := <-srv.removetrusted: // This channel is used by RemoveTrustedPeer to remove an enode // from the trusted node set. srv.log.Trace("Removing trusted node", "node", n) - if _, ok := trusted[n.ID()]; ok { - delete(trusted, n.ID()) - } - // Unmark any already-connected peer as trusted - if p, ok := peers[n.ID()]; ok { - p.rw.set(trustedConn, false) - } - case n := <-srv.addnotary: - // This channel is used by AddNotaryPeer to add to the - // ephemeral notary peer list. Add it to the dialer, - // it will keep the node connected. - srv.log.Trace("Adding notary node", "node", n) - notary[n.ID] = true - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, true) - } - dialstate.addStatic(n) - case n := <-srv.removenotary: - // This channel is used by RemoveNotaryPeer to send a - // disconnect request to a peer and begin the - // stop keeping the node connected. - srv.log.Trace("Removing notary node", "node", n) - if _, ok := notary[n.ID]; ok { - delete(notary, n.ID) - } - - if p, ok := peers[n.ID]; ok { - p.rw.set(notaryConn, false) - } - - dialstate.removeStatic(n) - if p, ok := peers[n.ID]; ok { - p.Disconnect(DiscRequested) - } + setConnFlags(n.ID(), trustedConn, false) case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -764,15 +875,9 @@ running: case c := <-srv.posthandshake: // A connection has passed the encryption handshake so // the remote identity is known (but hasn't been verified yet). - if trusted[c.node.ID()] { - // Ensure that the trusted flag is set before checking against MaxPeers. - c.flags |= trustedConn + if f, ok := peerflags[c.node.ID()]; ok { + c.flags |= f } - - if notary[c.id] { - c.flags |= notaryConn - } - // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): @@ -853,9 +958,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|staticDialedConn|directDialedConn|groupDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|directDialedConn|groupDialedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected |