diff options
-rw-r--r-- | ethereum.go | 107 | ||||
-rw-r--r-- | peer.go | 35 |
2 files changed, 91 insertions, 51 deletions
diff --git a/ethereum.go b/ethereum.go index a7a2f6b8c..eab40e93d 100644 --- a/ethereum.go +++ b/ethereum.go @@ -9,6 +9,7 @@ import ( "log" "net" "strconv" + "sync" "sync/atomic" "time" ) @@ -45,9 +46,14 @@ type Ethereum struct { Addr net.Addr nat NAT + + peerMut sync.Mutex + + // Capabilities for outgoing peers + serverCaps Caps } -func New() (*Ethereum, error) { +func New(caps Caps) (*Ethereum, error) { //db, err := ethdb.NewLDBDatabase() db, err := ethdb.NewMemDatabase() if err != nil { @@ -56,12 +62,11 @@ func New() (*Ethereum, error) { ethutil.Config.Db = db - /* - nat, err := Discover() - if err != nil { - log.Printf("Can'them discover upnp: %v", err) - } - */ + nat, err := Discover() + if err != nil { + log.Printf("Can't discover upnp: %v", err) + } + log.Println(nat) nonce, _ := ethutil.RandomUint64() ethereum := &Ethereum{ @@ -69,7 +74,8 @@ func New() (*Ethereum, error) { db: db, peers: list.New(), Nonce: nonce, - //nat: nat, + serverCaps: caps, + nat: nat, } ethereum.TxPool = ethchain.NewTxPool() ethereum.TxPool.Speaker = ethereum @@ -85,13 +91,8 @@ func (s *Ethereum) AddPeer(conn net.Conn) { peer := NewPeer(conn, s, true) if peer != nil { - if s.peers.Len() > 25 { - log.Println("SEED") - peer.Start(true) - } else { - s.peers.PushBack(peer) - peer.Start(false) - } + s.peers.PushBack(peer) + peer.Start(false) } } @@ -122,7 +123,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error { return nil } - peer := NewOutboundPeer(addr, s) + peer := NewOutboundPeer(addr, s, s.serverCaps) s.peers.PushBack(peer) @@ -158,12 +159,18 @@ func (s *Ethereum) InboundPeers() []*Peer { } func (s *Ethereum) InOutPeers() []*Peer { + // Reap the dead peers first + s.reapPeers() + // Create a new peer slice with at least the length of the total peers inboundPeers := make([]*Peer, s.peers.Len()) length := 0 eachPeer(s.peers, func(p *Peer, e *list.Element) { - inboundPeers[length] = p - length++ + // Only return peers with an actual ip + if len(p.host) > 0 { + inboundPeers[length] = p + length++ + } }) return inboundPeers[:length] @@ -171,6 +178,10 @@ func (s *Ethereum) InOutPeers() []*Peer { func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) { msg := ethwire.NewMessage(msgType, data) + s.BroadcastMsg(msg) +} + +func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) { eachPeer(s.peers, func(p *Peer, e *list.Element) { p.QueueMessage(msg) }) @@ -180,15 +191,25 @@ func (s *Ethereum) Peers() *list.List { return s.peers } -func (s *Ethereum) ReapDeadPeers() { - for { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { - s.peers.Remove(e) - } - }) +func (s *Ethereum) reapPeers() { + s.peerMut.Lock() + defer s.peerMut.Unlock() + + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { + s.peers.Remove(e) + } + }) +} - time.Sleep(processReapingTimeout * time.Second) +func (s *Ethereum) ReapDeadPeerHandler() { + reapTimer := time.NewTicker(processReapingTimeout * time.Second) + + for { + select { + case <-reapTimer.C: + s.reapPeers() + } } } @@ -241,29 +262,33 @@ func (s *Ethereum) Start() { } else { s.Addr = ln.Addr() // Starting accepting connections - go func() { - log.Println("Ready and accepting connections") - - for { - conn, err := ln.Accept() - if err != nil { - log.Println(err) - - continue - } - - go s.AddPeer(conn) - } - }() + log.Println("Ready and accepting connections") + // Start the peer handler + go s.peerHandler(ln) } + go s.upnpUpdateThread() + // Start the reaping processes - go s.ReapDeadPeers() + go s.ReapDeadPeerHandler() // Start the tx pool s.TxPool.Start() } +func (s *Ethereum) peerHandler(listener net.Listener) { + for { + conn, err := listener.Accept() + if err != nil { + log.Println(err) + + continue + } + + go s.AddPeer(conn) + } +} + func (s *Ethereum) Stop() { // Close the database defer s.db.Close() @@ -24,6 +24,8 @@ const ( CapDiscoveryTy = 0x01 CapTxTy = 0x02 CapChainTy = 0x04 + + CapDefault = CapChainTy | CapTxTy | CapDiscoveryTy ) var capsToString = map[Caps]string{ @@ -95,7 +97,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { } } -func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer { +func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { p := &Peer{ outputQueue: make(chan *ethwire.Msg, outputBufferSize), quit: make(chan bool), @@ -103,6 +105,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer { inbound: false, connected: 0, disconnect: 0, + caps: caps, } // Set up the connection in another goroutine so we don't block the main thread @@ -165,7 +168,8 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { // Outbound message handler. Outbound messages are handled here func (p *Peer) HandleOutbound() { // The ping timer. Makes sure that every 2 minutes a ping is send to the peer - tickleTimer := time.NewTicker(2 * time.Minute) + pingTimer := time.NewTicker(2 * time.Minute) + serviceTimer := time.NewTicker(5 * time.Second) out: for { select { @@ -175,11 +179,20 @@ out: p.lastSend = time.Now() - case <-tickleTimer.C: + // Ping timer sends a ping to the peer each 2 minutes + case <-pingTimer.C: p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) - // Break out of the for loop if a quit message is posted + // Service timer takes care of peer broadcasting, transaction + // posting or block posting + case <-serviceTimer.C: + if p.caps&CapDiscoveryTy > 0 { + msg := p.peersMessage() + p.ethereum.BroadcastMsg(msg) + } + case <-p.quit: + // Break out of the for loop if a quit message is posted break out } } @@ -387,7 +400,7 @@ func (p *Peer) Stop() { func (p *Peer) pushHandshake() error { msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ - uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", CapChainTy | CapTxTy | CapDiscoveryTy, p.port, + uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", p.caps, p.port, }) p.QueueMessage(msg) @@ -395,18 +408,20 @@ func (p *Peer) pushHandshake() error { return nil } -// Pushes the list of outbound peers to the client when requested -func (p *Peer) pushPeers() { +func (p *Peer) peersMessage() *ethwire.Msg { outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) // Serialise each peer for i, peer := range p.ethereum.InOutPeers() { outPeers[i] = peer.RlpData() } - // Send message to the peer with the known list of connected clients - msg := ethwire.NewMessage(ethwire.MsgPeersTy, outPeers) + // Return the message to the peer with the known list of connected clients + return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers) +} - p.QueueMessage(msg) +// Pushes the list of outbound peers to the client when requested +func (p *Peer) pushPeers() { + p.QueueMessage(p.peersMessage()) } func (p *Peer) handleHandshake(msg *ethwire.Msg) { |