aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go343
1 files changed, 184 insertions, 159 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 4fd1f7d03..9b8030541 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -2,37 +2,56 @@ package p2p
import (
"bytes"
+ "crypto/ecdsa"
"errors"
"fmt"
+ "io"
"net"
+ "runtime"
"sync"
"time"
"github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
const (
- outboundAddressPoolSize = 500
defaultDialTimeout = 10 * time.Second
+ refreshPeersInterval = 30 * time.Second
portMappingUpdateInterval = 15 * time.Minute
portMappingTimeout = 20 * time.Minute
)
var srvlog = logger.NewLogger("P2P Server")
+// MakeName creates a node name that follows the ethereum convention
+// for such names. It adds the operation system name and Go runtime version
+// the name.
+func MakeName(name, version string) string {
+ return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version())
+}
+
// Server manages all peer connections.
//
// The fields of Server are used as configuration parameters.
// You should set them before starting the Server. Fields may not be
// modified while the server is running.
type Server struct {
- // This field must be set to a valid client identity.
- Identity ClientIdentity
+ // This field must be set to a valid secp256k1 private key.
+ PrivateKey *ecdsa.PrivateKey
// MaxPeers is the maximum number of peers that can be
// connected. It must be greater than zero.
MaxPeers int
+ // Name sets the node name of this server.
+ // Use MakeName to create a name that follows existing conventions.
+ Name string
+
+ // Bootstrap nodes are used to establish connectivity
+ // with the rest of the network.
+ BootstrapNodes []discover.Node
+
// Protocols should contain the protocols supported
// by the server. Matching protocols are launched for
// each peer.
@@ -62,22 +81,23 @@ type Server struct {
// If NoDial is true, the server will not dial any peers.
NoDial bool
- // Hook for testing. This is useful because we can inhibit
+ // Hooks for testing. These are useful because we can inhibit
// the whole protocol stack.
- newPeerFunc peerFunc
+ handshakeFunc
+ newPeerHook
- lock sync.RWMutex
- running bool
- listener net.Listener
- laddr *net.TCPAddr // real listen addr
- peers []*Peer
- peerSlots chan int
- peerCount int
-
- quit chan struct{}
- wg sync.WaitGroup
- peerConnect chan *peerAddr
- peerDisconnect chan *Peer
+ lock sync.RWMutex
+ running bool
+ listener net.Listener
+ laddr *net.TCPAddr // real listen addr
+ peers map[discover.NodeID]*Peer
+
+ ntab *discover.Table
+
+ quit chan struct{}
+ loopWG sync.WaitGroup // {dial,listen,nat}Loop
+ peerWG sync.WaitGroup // active peer goroutines
+ peerConnect chan *discover.Node
}
// NAT is implemented by NAT traversal methods.
@@ -90,7 +110,8 @@ type NAT interface {
String() string
}
-type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer
+type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error)
+type newPeerHook func(*Peer)
// Peers returns all connected peers.
func (srv *Server) Peers() (peers []*Peer) {
@@ -107,18 +128,15 @@ func (srv *Server) Peers() (peers []*Peer) {
// PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
srv.lock.RLock()
- defer srv.lock.RUnlock()
- return srv.peerCount
+ n := len(srv.peers)
+ srv.lock.RUnlock()
+ return n
}
-// SuggestPeer injects an address into the outbound address pool.
-func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
- addr := &peerAddr{ip, uint64(port), nodeID}
- select {
- case srv.peerConnect <- addr:
- default: // don't block
- srvlog.Warnf("peer suggestion %v ignored", addr)
- }
+// SuggestPeer creates a connection to the given Node if it
+// is not already connected.
+func (srv *Server) SuggestPeer(ip net.IP, port int, id discover.NodeID) {
+ srv.peerConnect <- &discover.Node{ID: id, Addr: &net.UDPAddr{IP: ip, Port: port}}
}
// Broadcast sends an RLP-encoded message to all connected peers.
@@ -152,47 +170,47 @@ func (srv *Server) Start() (err error) {
}
srvlog.Infoln("Starting Server")
- // initialize fields
- if srv.Identity == nil {
- return fmt.Errorf("Server.Identity must be set to a non-nil identity")
+ // initialize all the fields
+ if srv.PrivateKey == nil {
+ return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
if srv.MaxPeers <= 0 {
return fmt.Errorf("Server.MaxPeers must be > 0")
}
srv.quit = make(chan struct{})
- srv.peers = make([]*Peer, srv.MaxPeers)
- srv.peerSlots = make(chan int, srv.MaxPeers)
- srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize)
- srv.peerDisconnect = make(chan *Peer)
- if srv.newPeerFunc == nil {
- srv.newPeerFunc = newServerPeer
+ srv.peers = make(map[discover.NodeID]*Peer)
+ srv.peerConnect = make(chan *discover.Node)
+
+ if srv.handshakeFunc == nil {
+ srv.handshakeFunc = encHandshake
}
if srv.Blacklist == nil {
srv.Blacklist = NewBlacklist()
}
- if srv.Dialer == nil {
- srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
- }
-
if srv.ListenAddr != "" {
if err := srv.startListening(); err != nil {
return err
}
}
+
+ // dial stuff
+ dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr)
+ if err != nil {
+ return err
+ }
+ srv.ntab = dt
+ if srv.Dialer == nil {
+ srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
+ }
if !srv.NoDial {
- srv.wg.Add(1)
+ srv.loopWG.Add(1)
go srv.dialLoop()
}
+
if srv.NoDial && srv.ListenAddr == "" {
srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
}
- // make all slots available
- for i := range srv.peers {
- srv.peerSlots <- i
- }
- // note: discLoop is not part of WaitGroup
- go srv.discLoop()
srv.running = true
return nil
}
@@ -205,10 +223,10 @@ func (srv *Server) startListening() error {
srv.ListenAddr = listener.Addr().String()
srv.laddr = listener.Addr().(*net.TCPAddr)
srv.listener = listener
- srv.wg.Add(1)
+ srv.loopWG.Add(1)
go srv.listenLoop()
if !srv.laddr.IP.IsLoopback() && srv.NAT != nil {
- srv.wg.Add(1)
+ srv.loopWG.Add(1)
go srv.natLoop(srv.laddr.Port)
}
return nil
@@ -225,57 +243,41 @@ func (srv *Server) Stop() {
srv.running = false
srv.lock.Unlock()
- srvlog.Infoln("Stopping server")
+ srvlog.Infoln("Stopping Server")
+ srv.ntab.Close()
if srv.listener != nil {
// this unblocks listener Accept
srv.listener.Close()
}
close(srv.quit)
- for _, peer := range srv.Peers() {
- peer.Disconnect(DiscQuitting)
- }
- srv.wg.Wait()
+ srv.loopWG.Wait()
- // wait till they actually disconnect
- // this is checked by claiming all peerSlots.
- // slots become available as the peers disconnect.
- for i := 0; i < cap(srv.peerSlots); i++ {
- <-srv.peerSlots
- }
- // terminate discLoop
- close(srv.peerDisconnect)
-}
-
-func (srv *Server) discLoop() {
- for peer := range srv.peerDisconnect {
- srv.removePeer(peer)
+ // No new peers can be added at this point because dialLoop and
+ // listenLoop are down. It is safe to call peerWG.Wait because
+ // peerWG.Add is not called outside of those loops.
+ for _, peer := range srv.peers {
+ peer.Disconnect(DiscQuitting)
}
+ srv.peerWG.Wait()
}
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
- defer srv.wg.Done()
-
+ defer srv.loopWG.Done()
srvlog.Infoln("Listening on", srv.listener.Addr())
for {
- select {
- case slot := <-srv.peerSlots:
- srvlog.Debugf("grabbed slot %v for listening", slot)
- conn, err := srv.listener.Accept()
- if err != nil {
- srv.peerSlots <- slot
- return
- }
- srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot)
- srv.addPeer(conn, nil, slot)
- case <-srv.quit:
+ conn, err := srv.listener.Accept()
+ if err != nil {
return
}
+ srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
+ srv.peerWG.Add(1)
+ go srv.startPeer(conn, nil)
}
}
func (srv *Server) natLoop(port int) {
- defer srv.wg.Done()
+ defer srv.loopWG.Done()
for {
srv.updatePortMapping(port)
select {
@@ -314,108 +316,131 @@ func (srv *Server) removePortMapping(port int) {
}
func (srv *Server) dialLoop() {
- defer srv.wg.Done()
- var (
- suggest chan *peerAddr
- slot *int
- slots = srv.peerSlots
- )
+ defer srv.loopWG.Done()
+ refresh := time.NewTicker(refreshPeersInterval)
+ defer refresh.Stop()
+
+ srv.ntab.Bootstrap(srv.BootstrapNodes)
+ go srv.findPeers()
+
+ dialed := make(chan *discover.Node)
+ dialing := make(map[discover.NodeID]bool)
+
+ // TODO: limit number of active dials
+ // TODO: ensure only one findPeers goroutine is running
+ // TODO: pause findPeers when we're at capacity
+
for {
select {
- case i := <-slots:
- // we need a peer in slot i, slot reserved
- slot = &i
- // now we can watch for candidate peers in the next loop
- suggest = srv.peerConnect
- // do not consume more until candidate peer is found
- slots = nil
-
- case desc := <-suggest:
- // candidate peer found, will dial out asyncronously
- // if connection fails slot will be released
- srvlog.DebugDetailf("dial %v (%v)", desc, *slot)
- go srv.dialPeer(desc, *slot)
- // we can watch if more peers needed in the next loop
- slots = srv.peerSlots
- // until then we dont care about candidate peers
- suggest = nil
+ case <-refresh.C:
- case <-srv.quit:
- // give back the currently reserved slot
- if slot != nil {
- srv.peerSlots <- *slot
+ go srv.findPeers()
+
+ case dest := <-srv.peerConnect:
+ srv.lock.Lock()
+ _, isconnected := srv.peers[dest.ID]
+ srv.lock.Unlock()
+ if isconnected || dialing[dest.ID] {
+ continue
}
+
+ dialing[dest.ID] = true
+ srv.peerWG.Add(1)
+ go func() {
+ srv.dialNode(dest)
+ // at this point, the peer has been added
+ // or discarded. either way, we're not dialing it anymore.
+ dialed <- dest
+ }()
+
+ case dest := <-dialed:
+ delete(dialing, dest.ID)
+
+ case <-srv.quit:
+ // TODO: maybe wait for active dials
return
}
}
}
-// connect to peer via dial out
-func (srv *Server) dialPeer(desc *peerAddr, slot int) {
- srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot)
- conn, err := srv.Dialer.Dial(desc.Network(), desc.String())
+func (srv *Server) dialNode(dest *discover.Node) {
+ srvlog.Debugf("Dialing %v\n", dest.Addr)
+ conn, err := srv.Dialer.Dial("tcp", dest.Addr.String())
if err != nil {
srvlog.DebugDetailf("dial error: %v", err)
- srv.peerSlots <- slot
return
}
- go srv.addPeer(conn, desc, slot)
+ srv.startPeer(conn, dest)
}
-// creates the new peer object and inserts it into its slot
-func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer {
- srv.lock.Lock()
- defer srv.lock.Unlock()
- if !srv.running {
+func (srv *Server) findPeers() {
+ far := srv.ntab.Self()
+ for i := range far {
+ far[i] = ^far[i]
+ }
+ closeToSelf := srv.ntab.Lookup(srv.ntab.Self())
+ farFromSelf := srv.ntab.Lookup(far)
+
+ for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
+ if i < len(closeToSelf) {
+ srv.peerConnect <- closeToSelf[i]
+ }
+ if i < len(farFromSelf) {
+ srv.peerConnect <- farFromSelf[i]
+ }
+ }
+}
+
+func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) {
+ // TODO: I/O timeout, handle/store session token
+ remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest)
+ if err != nil {
conn.Close()
- srv.peerSlots <- slot // release slot
- return nil
+ srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err)
+ return
+ }
+
+ ourID := srv.ntab.Self()
+ p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID)
+ if ok, reason := srv.addPeer(remoteID, p); !ok {
+ p.Disconnect(reason)
+ return
}
- peer := srv.newPeerFunc(srv, conn, desc)
- peer.slot = slot
- srv.peers[slot] = peer
- srv.peerCount++
- go func() { peer.loop(); srv.peerDisconnect <- peer }()
- return peer
+
+ srv.newPeerHook(p)
+ p.run()
+ srv.removePeer(p)
}
-// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
-func (srv *Server) removePeer(peer *Peer) {
+func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
srv.lock.Lock()
defer srv.lock.Unlock()
- srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot)
- if srv.peers[peer.slot] != peer {
- srvlog.Warnln("Invalid peer to remove:", peer)
- return
- }
- // remove from list and index
- srv.peerCount--
- srv.peers[peer.slot] = nil
- // release slot to signal need for a new peer, last!
- srv.peerSlots <- peer.slot
+ switch {
+ case !srv.running:
+ return false, DiscQuitting
+ case len(srv.peers) >= srv.MaxPeers:
+ return false, DiscTooManyPeers
+ case srv.peers[id] != nil:
+ return false, DiscAlreadyConnected
+ case srv.Blacklist.Exists(id[:]):
+ return false, DiscUselessPeer
+ case id == srv.ntab.Self():
+ return false, DiscSelf
+ }
+ srvlog.Debugf("Adding %v\n", p)
+ srv.peers[id] = p
+ return true, 0
}
-func (srv *Server) verifyPeer(addr *peerAddr) error {
- if srv.Blacklist.Exists(addr.Pubkey) {
- return errors.New("blacklisted")
- }
- if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) {
- return newPeerError(errPubkeyForbidden, "not allowed to connect to srv")
- }
- srv.lock.RLock()
- defer srv.lock.RUnlock()
- for _, peer := range srv.peers {
- if peer != nil {
- id := peer.Identity()
- if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) {
- return errors.New("already connected")
- }
- }
- }
- return nil
+// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
+func (srv *Server) removePeer(p *Peer) {
+ srvlog.Debugf("Removing %v\n", p)
+ srv.lock.Lock()
+ delete(srv.peers, *p.remoteID)
+ srv.lock.Unlock()
+ srv.peerWG.Done()
}
-// TODO replace with "Set"
type Blacklist interface {
Get([]byte) (bool, error)
Put([]byte) error