aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-25 14:56:57 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:21:31 +0800
commit83677118fade9f9c39fc39ac211dd0c149a09afb (patch)
tree7b2580646d969114458bb156645ea535544bc8bb
parentad73ab6d3d090cfeb959cf72f716083763a37616 (diff)
downloadgo-tangerine-83677118fade9f9c39fc39ac211dd0c149a09afb.tar.gz
go-tangerine-83677118fade9f9c39fc39ac211dd0c149a09afb.tar.zst
go-tangerine-83677118fade9f9c39fc39ac211dd0c149a09afb.zip
p2p: implement AddNotaryPeer and RemoveNotaryPeer
AddNotaryPeer adds node to static node set so that server will maintain the connection with the notary node. AddNotaryPeer also sets the notaryConn flag to allow the node to always connect, even if the slot are full. RemoveNotaryPeer removes node from static, then disconnect and unsets the notaryConn flag.
-rw-r--r--p2p/peer.go2
-rw-r--r--p2p/server.go66
-rw-r--r--p2p/server_test.go121
3 files changed, 184 insertions, 5 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index af019d07a..4828d3234 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -434,6 +434,7 @@ type PeerInfo struct {
RemoteAddress string `json:"remoteAddress"` // Remote endpoint of the TCP data connection
Inbound bool `json:"inbound"`
Trusted bool `json:"trusted"`
+ Notary bool `json:"notary"`
Static bool `json:"static"`
} `json:"network"`
Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields
@@ -458,6 +459,7 @@ func (p *Peer) Info() *PeerInfo {
info.Network.RemoteAddress = p.RemoteAddr().String()
info.Network.Inbound = p.rw.is(inboundConn)
info.Network.Trusted = p.rw.is(trustedConn)
+ info.Network.Notary = p.rw.is(notaryConn)
info.Network.Static = p.rw.is(staticDialedConn)
// Gather all the running protocol infos
diff --git a/p2p/server.go b/p2p/server.go
index 566f01ffc..15f6ad167 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -180,6 +180,8 @@ type Server struct {
removestatic chan *enode.Node
addtrusted chan *enode.Node
removetrusted chan *enode.Node
+ addnotary chan *enode.Node
+ removenotary chan *enode.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
@@ -203,6 +205,7 @@ const (
staticDialedConn
inboundConn
trustedConn
+ notaryConn
)
// conn wraps a network connection with information gathered
@@ -254,6 +257,9 @@ func (f connFlag) String() string {
if f&inboundConn != 0 {
s += "-inbound"
}
+ if f&notaryConn != 0 {
+ s += "-notary"
+ }
if s != "" {
s = s[1:]
}
@@ -344,6 +350,27 @@ func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
}
}
+// AddNotaryPeer connects to the given node and maintains the connection until the
+// server is shut down. If the connection fails for any reason, the server will
+// attempt to reconnect the peer.
+// AddNotaryPeer also adds the given node to a reserved whitelist which allows the
+// node to always connect, even if the slot are full.
+func (srv *Server) AddNotaryPeer(node *discover.Node) {
+ select {
+ case srv.addnotary <- node:
+ case <-srv.quit:
+ }
+}
+
+// RemoveNotaryPeer disconnects from the given node.
+// RemoveNotaryPeer also removes the given node from the notary peer set.
+func (srv *Server) RemoveNotaryPeer(node *discover.Node) {
+ select {
+ case srv.removenotary <- node:
+ case <-srv.quit:
+ }
+}
+
// SubscribePeers subscribes the given channel to peer events
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch)
@@ -440,6 +467,8 @@ func (srv *Server) Start() (err error) {
srv.removestatic = make(chan *enode.Node)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
+ srv.addnotary = make(chan *enode.Node)
+ srv.removenotary = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
@@ -610,6 +639,7 @@ func (srv *Server) run(dialstate dialer) {
peers = make(map[enode.ID]*Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
+ notary = make(map[enode.ID]bool)
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
@@ -693,6 +723,33 @@ running:
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
}
+ case n := <-srv.addnotary:
+ // This channel is used by AddNotaryPeer to add to the
+ // ephemeral notary peer list. Add it to the dialer,
+ // it will keep the node connected.
+ srv.log.Trace("Adding notary node", "node", n)
+ notary[n.ID] = true
+ if p, ok := peers[n.ID]; ok {
+ p.rw.set(notaryConn, true)
+ }
+ dialstate.addStatic(n)
+ case n := <-srv.removenotary:
+ // This channel is used by RemoveNotaryPeer to send a
+ // disconnect request to a peer and begin the
+ // stop keeping the node connected.
+ srv.log.Trace("Removing notary node", "node", n)
+ if _, ok := notary[n.ID]; ok {
+ delete(notary, n.ID)
+ }
+
+ if p, ok := peers[n.ID]; ok {
+ p.rw.set(notaryConn, false)
+ }
+
+ dialstate.removeStatic(n)
+ if p, ok := peers[n.ID]; ok {
+ p.Disconnect(DiscRequested)
+ }
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
@@ -711,6 +768,11 @@ running:
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
+
+ if notary[c.id] {
+ c.flags |= notaryConn
+ }
+
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
@@ -791,9 +853,9 @@ func (srv *Server) protoHandshakeChecks(peers map[enode.ID]*Peer, inboundCount i
func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
switch {
- case !c.is(trustedConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
+ case !c.is(trustedConn|notaryConn|staticDialedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
- case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
+ case !c.is(trustedConn|notaryConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
diff --git a/p2p/server_test.go b/p2p/server_test.go
index f665c1424..c3ff825a3 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"golang.org/x/crypto/sha3"
@@ -172,10 +173,14 @@ func TestServerDial(t *testing.T) {
}
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
+ // Test AddNotaryPeer/RemoveTrustedPeer and changing Notary flags.
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
+ if peer := srv.Peers()[0]; peer.Info().Network.Notary {
+ t.Errorf("peer is notary prematurely: %v", peer)
+ }
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
@@ -186,6 +191,15 @@ func TestServerDial(t *testing.T) {
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
+
+ srv.AddNotaryPeer(node)
+ if peer := srv.Peers()[0]; !peer.Info().Network.Notary {
+ t.Errorf("peer is not notary after AddNotaryPeer: %v", peer)
+ }
+ srv.RemoveNotaryPeer(node)
+ if peer := srv.Peers()[0]; peer.Info().Network.Notary {
+ t.Errorf("peer is notary after RemoveNotaryPeer: %v", peer)
+ }
done <- true
}()
// Trigger potential race conditions
@@ -202,6 +216,73 @@ func TestServerDial(t *testing.T) {
}
}
+// TestNotaryPeer checks that the node is added to and remove from static when
+// AddNotaryPeer and RemoveNotaryPeer is called.
+func TestNotaryPeer(t *testing.T) {
+ var (
+ returned = make(chan struct{})
+ add, remove = make(chan *discover.Node), make(chan *discover.Node)
+ tg = taskgen{
+ newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
+ return []task{}
+ },
+ doneFunc: func(t task) {},
+ addFunc: func(n *discover.Node) {
+ add <- n
+ },
+ removeFunc: func(n *discover.Node) {
+ remove <- n
+ },
+ }
+ )
+
+ srv := &Server{
+ Config: Config{MaxPeers: 10},
+ quit: make(chan struct{}),
+ ntab: fakeTable{},
+ addnotary: make(chan *discover.Node),
+ removenotary: make(chan *discover.Node),
+ running: true,
+ log: log.New(),
+ }
+ srv.loopWG.Add(1)
+ go func() {
+ srv.run(tg)
+ close(returned)
+ }()
+
+ notaryID := randomID()
+ go srv.AddNotaryPeer(&discover.Node{ID: notaryID})
+
+ select {
+ case n := <-add:
+ if n.ID != notaryID {
+ t.Errorf("node ID mismatched: got %s, want %s",
+ n.ID.String(), notaryID.String())
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("add static is not called within one second")
+ }
+
+ go srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
+ select {
+ case n := <-remove:
+ if n.ID != notaryID {
+ t.Errorf("node ID mismatched: got %s, want %s",
+ n.ID.String(), notaryID.String())
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("remove static is not called within one second")
+ }
+
+ srv.Stop()
+ select {
+ case <-returned:
+ case <-time.After(500 * time.Millisecond):
+ t.Error("Server.run did not return within 500ms")
+ }
+}
+
// This test checks that tasks generated by dialstate are
// actually executed and taskdone is called for them.
func TestServerTaskScheduling(t *testing.T) {
@@ -326,6 +407,9 @@ func TestServerManyTasks(t *testing.T) {
type taskgen struct {
newFunc func(running int, peers map[enode.ID]*Peer) []task
doneFunc func(task)
+
+ addFunc func(*discover.Node)
+ removeFunc func(*discover.Node)
}
func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task {
@@ -334,9 +418,11 @@ func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time)
func (tg taskgen) taskDone(t task, now time.Time) {
tg.doneFunc(t)
}
-func (tg taskgen) addStatic(*enode.Node) {
+func (tg taskgen) addStatic(n *enode.Node) {
+ tg.addFunc(n)
}
-func (tg taskgen) removeStatic(*enode.Node) {
+func (tg taskgen) removeStatic(n *enode.Node) {
+ tg.removeFunc(n)
}
type testTask struct {
@@ -350,10 +436,11 @@ func (t *testTask) Do(srv *Server) {
// This test checks that connections are disconnected
// just after the encryption handshake when the server is
-// at capacity. Trusted connections should still be accepted.
+// at capacity. Trusted and Notary connections should still be accepted.
func TestServerAtCap(t *testing.T) {
trustedNode := newkey()
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
+ notaryID := randomID()
srv := &Server{
Config: Config{
PrivateKey: newkey(),
@@ -366,6 +453,7 @@ func TestServerAtCap(t *testing.T) {
t.Fatalf("could not start: %v", err)
}
defer srv.Stop()
+ srv.AddNotaryPeer(&discover.Node{ID: notaryID})
newconn := func(id enode.ID) *conn {
fd, _ := net.Pipe()
@@ -396,6 +484,15 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}
+ // Try inserting a notary connection.
+ c = newconn(notaryID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ t.Error("unexpected error for notary conn @posthandshake:", err)
+ }
+ if !c.is(notaryConn) {
+ t.Error("Server did not set notary flag")
+ }
+
// Remove from trusted set and try again
srv.RemoveTrustedPeer(newNode(trustedID, nil))
c = newconn(trustedID)
@@ -412,6 +509,24 @@ func TestServerAtCap(t *testing.T) {
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
+
+ // Remove from notary set and try again
+ srv.RemoveNotaryPeer(&discover.Node{ID: notaryID})
+ c = newconn(notaryID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
+ t.Error("wrong error for insert:", err)
+ }
+
+ // Add anotherID to notary set and try again
+ anotherNotaryID := randomID()
+ srv.AddNotaryPeer(&discover.Node{ID: anotherNotaryID})
+ c = newconn(anotherNotaryID)
+ if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ t.Error("unexpected error for notary conn @posthandshake:", err)
+ }
+ if !c.is(notaryConn) {
+ t.Error("Server did not set notary flag")
+ }
}
func TestServerPeerLimits(t *testing.T) {