diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 14:56:57 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:21:31 +0800 |
commit | 83677118fade9f9c39fc39ac211dd0c149a09afb (patch) | |
tree | 7b2580646d969114458bb156645ea535544bc8bb | |
parent | ad73ab6d3d090cfeb959cf72f716083763a37616 (diff) | |
download | go-tangerine-83677118fade9f9c39fc39ac211dd0c149a09afb.tar.gz go-tangerine-83677118fade9f9c39fc39ac211dd0c149a09afb.tar.zst go-tangerine-83677118fade9f9c39fc39ac211dd0c149a09afb.zip |
p2p: implement AddNotaryPeer and RemoveNotaryPeer
AddNotaryPeer adds node to static node set so that server will maintain
the connection with the notary node.
AddNotaryPeer also sets the notaryConn flag to allow the node to always
connect, even if the slot are full.
RemoveNotaryPeer removes node from static, then disconnect and unsets
the notaryConn flag.
-rw-r--r-- | p2p/peer.go | 2 | ||||
-rw-r--r-- | p2p/server.go | 66 | ||||
-rw-r--r-- | p2p/server_test.go | 121 |
3 files changed, 184 insertions, 5 deletions
diff --git a/p2p/peer.go b/p2p/peer.go index af019d07a..4828d3234 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -434,6 +434,7 @@ type PeerInfo struct { RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection Inbound bool `json:"inbound"` Trusted bool `json:"trusted"` + Notary bool `json:"notary"` Static bool `json:"static"` } `json:"network"` Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields @@ -458,6 +459,7 @@ func (p *Peer) Info() *PeerInfo { info.Network.RemoteAddress = p.RemoteAddr().String() info.Network.Inbound = p.rw.is(inboundConn) info.Network.Trusted = p.rw.is(trustedConn) + info.Network.Notary = p.rw.is(notaryConn) info.Network.Static = p.rw.is(staticDialedConn) // Gather all the running protocol infos diff --git a/p2p/server.go b/p2p/server.go index 566f01ffc..15f6ad167 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -180,6 +180,8 @@ type Server struct { removestatic chan *enode.Node addtrusted chan *enode.Node removetrusted chan *enode.Node + addnotary chan *enode.Node + removenotary chan *enode.Node posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop @@ -203,6 +205,7 @@ const ( staticDialedConn inboundConn trustedConn + notaryConn ) // conn wraps a network connection with information gathered @@ -254,6 +257,9 @@ func (f connFlag) String() string { if f&inboundConn != 0 { s += "-inbound" } + if f¬aryConn != 0 { + s += "-notary" + } if s != "" { s = s[1:] } @@ -344,6 +350,27 @@ func (srv *Server) RemoveTrustedPeer(node *enode.Node) { } } +// AddNotaryPeer connects to the given node and maintains the connection until the +// server is shut down. If the connection fails for any reason, the server will +// attempt to reconnect the peer. +// AddNotaryPeer also adds the given node to a reserved whitelist which allows the +// node to always connect, even if the slot are full. +func (srv *Server) AddNotaryPeer(node *discover.Node) { + select { + case srv.addnotary <- node: + case <-srv.quit: + } +} + +// RemoveNotaryPeer disconnects from the given node. +// RemoveNotaryPeer also removes the given node from the notary peer set. +func (srv *Server) RemoveNotaryPeer(node *discover.Node) { + select { + case srv.removenotary <- node: + case <-srv.quit: + } +} + // SubscribePeers subscribes the given channel to peer events func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { return srv.peerFeed.Subscribe(ch) @@ -440,6 +467,8 @@ func (srv *Server) Start() (err error) { srv.removestatic = make(chan *enode.Node) srv.addtrusted = make(chan *enode.Node) srv.removetrusted = make(chan *enode.Node) + srv.addnotary = make(chan *enode.Node) + srv.removenotary = make(chan *enode.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{}) @@ -610,6 +639,7 @@ func (srv *Server) run(dialstate dialer) { peers = make(map[enode.ID]*Peer) inboundCount = 0 trusted = make(map[enode.ID]bool, len(srv.TrustedNodes)) + notary = make(map[enode.ID]bool) taskdone = make(chan task, maxActiveDialTasks) runningTasks []task queuedTasks []task // tasks that can't run yet @@ -693,6 +723,33 @@ running: if p, ok := peers[n.ID()]; ok { p.rw.set(trustedConn, false) } + case n := <-srv.addnotary: + // This channel is used by AddNotaryPeer to add to the + // ephemeral notary peer list. Add it to the dialer, + // it will keep the node connected. + srv.log.Trace("Adding notary node", "node", n) + notary[n.ID] = true + if p, ok := peers[n.ID]; ok { + p.rw.set(notaryConn, true) + } + dialstate.addStatic(n) + case n := <-srv.removenotary: + // This channel is used by RemoveNotaryPeer to send a + // disconnect request to a peer and begin the + // stop keeping the node connected. + srv.log.Trace("Removing notary node", "node", n) + if _, ok := notary[n.ID]; ok { + delete(notary, n.ID) + } + + if p, ok := peers[n.ID]; ok { + p.rw.set(notaryConn, false) + } + + dialstate.removeStatic(n) + if p, ok := peers[n.ID]; ok { + p.Disconnect(DiscRequested) + } case op := <-srv.peerOp: // This channel is used by Peers and PeerCount. op(peers) @@ -711,6 +768,11 @@ running: // Ensure that the trusted flag is set before checking against MaxPeers. c.flags |= trustedConn } + + if notary[c.id] { + c.flags |= notaryConn + } + // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): @@ -791,9 +853,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error { switch { - case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers: + case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected diff --git a/p2p/server_test.go b/p2p/server_test.go index f665c1424..c3ff825a3 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "golang.org/x/crypto/sha3" @@ -172,10 +173,14 @@ func TestServerDial(t *testing.T) { } // Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags + // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags. // Particularly for race conditions on changing the flag state. if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted prematurely: %v", peer) } + if peer := srv.Peers()[0]; peer.Info().Network.Notary { + t.Errorf("peer is notary prematurely: %v", peer) + } done := make(chan bool) go func() { srv.AddTrustedPeer(node) @@ -186,6 +191,15 @@ func TestServerDial(t *testing.T) { if peer := srv.Peers()[0]; peer.Info().Network.Trusted { t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer) } + + srv.AddNotaryPeer(node) + if peer := srv.Peers()[0]; !peer.Info().Network.Notary { + t.Errorf("peer is not notary after AddNotaryPeer: %v", peer) + } + srv.RemoveNotaryPeer(node) + if peer := srv.Peers()[0]; peer.Info().Network.Notary { + t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer) + } done <- true }() // Trigger potential race conditions @@ -202,6 +216,73 @@ func TestServerDial(t *testing.T) { } } +// TestNotaryPeer checks that the node is added to and remove from static when +// AddNotaryPeer and RemoveNotaryPeer is called. +func TestNotaryPeer(t *testing.T) { + var ( + returned = make(chan struct{}) + add, remove = make(chan *discover.Node), make(chan *discover.Node) + tg = taskgen{ + newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { + return []task{} + }, + doneFunc: func(t task) {}, + addFunc: func(n *discover.Node) { + add <- n + }, + removeFunc: func(n *discover.Node) { + remove <- n + }, + } + ) + + srv := &Server{ + Config: Config{MaxPeers: 10}, + quit: make(chan struct{}), + ntab: fakeTable{}, + addnotary: make(chan *discover.Node), + removenotary: make(chan *discover.Node), + running: true, + log: log.New(), + } + srv.loopWG.Add(1) + go func() { + srv.run(tg) + close(returned) + }() + + notaryID := randomID() + go srv.AddNotaryPeer(&discover.Node{ID: notaryID}) + + select { + case n := <-add: + if n.ID != notaryID { + t.Errorf("node ID mismatched: got %s, want %s", + n.ID.String(), notaryID.String()) + } + case <-time.After(1 * time.Second): + t.Error("add static is not called within one second") + } + + go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) + select { + case n := <-remove: + if n.ID != notaryID { + t.Errorf("node ID mismatched: got %s, want %s", + n.ID.String(), notaryID.String()) + } + case <-time.After(1 * time.Second): + t.Error("remove static is not called within one second") + } + + srv.Stop() + select { + case <-returned: + case <-time.After(500 * time.Millisecond): + t.Error("Server.run did not return within 500ms") + } +} + // This test checks that tasks generated by dialstate are // actually executed and taskdone is called for them. func TestServerTaskScheduling(t *testing.T) { @@ -326,6 +407,9 @@ func TestServerManyTasks(t *testing.T) { type taskgen struct { newFunc func(running int, peers map[enode.ID]*Peer) []task doneFunc func(task) + + addFunc func(*discover.Node) + removeFunc func(*discover.Node) } func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task { @@ -334,9 +418,11 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) func (tg taskgen) taskDone(t task, now time.Time) { tg.doneFunc(t) } -func (tg taskgen) addStatic(*enode.Node) { +func (tg taskgen) addStatic(n *enode.Node) { + tg.addFunc(n) } -func (tg taskgen) removeStatic(*enode.Node) { +func (tg taskgen) removeStatic(n *enode.Node) { + tg.removeFunc(n) } type testTask struct { @@ -350,10 +436,11 @@ func (t *testTask) Do(srv *Server) { // This test checks that connections are disconnected // just after the encryption handshake when the server is -// at capacity. Trusted connections should still be accepted. +// at capacity. Trusted and Notary connections should still be accepted. func TestServerAtCap(t *testing.T) { trustedNode := newkey() trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey) + notaryID := randomID() srv := &Server{ Config: Config{ PrivateKey: newkey(), @@ -366,6 +453,7 @@ func TestServerAtCap(t *testing.T) { t.Fatalf("could not start: %v", err) } defer srv.Stop() + srv.AddNotaryPeer(&discover.Node{ID: notaryID}) newconn := func(id enode.ID) *conn { fd, _ := net.Pipe() @@ -396,6 +484,15 @@ func TestServerAtCap(t *testing.T) { t.Error("Server did not set trusted flag") } + // Try inserting a notary connection. + c = newconn(notaryID) + if err := srv.checkpoint(c, srv.posthandshake); err != nil { + t.Error("unexpected error for notary conn @posthandshake:", err) + } + if !c.is(notaryConn) { + t.Error("Server did not set notary flag") + } + // Remove from trusted set and try again srv.RemoveTrustedPeer(newNode(trustedID, nil)) c = newconn(trustedID) @@ -412,6 +509,24 @@ func TestServerAtCap(t *testing.T) { if !c.is(trustedConn) { t.Error("Server did not set trusted flag") } + + // Remove from notary set and try again + srv.RemoveNotaryPeer(&discover.Node{ID: notaryID}) + c = newconn(notaryID) + if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { + t.Error("wrong error for insert:", err) + } + + // Add anotherID to notary set and try again + anotherNotaryID := randomID() + srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID}) + c = newconn(anotherNotaryID) + if err := srv.checkpoint(c, srv.posthandshake); err != nil { + t.Error("unexpected error for notary conn @posthandshake:", err) + } + if !c.is(notaryConn) { + t.Error("Server did not set notary flag") + } } func TestServerPeerLimits(t *testing.T) { |