diff options
Diffstat (limited to 'ethereum.go')
-rw-r--r-- | ethereum.go | 107 |
1 files changed, 66 insertions, 41 deletions
diff --git a/ethereum.go b/ethereum.go index a7a2f6b8c..eab40e93d 100644 --- a/ethereum.go +++ b/ethereum.go @@ -9,6 +9,7 @@ import ( "log" "net" "strconv" + "sync" "sync/atomic" "time" ) @@ -45,9 +46,14 @@ type Ethereum struct { Addr net.Addr nat NAT + + peerMut sync.Mutex + + // Capabilities for outgoing peers + serverCaps Caps } -func New() (*Ethereum, error) { +func New(caps Caps) (*Ethereum, error) { //db, err := ethdb.NewLDBDatabase() db, err := ethdb.NewMemDatabase() if err != nil { @@ -56,12 +62,11 @@ func New() (*Ethereum, error) { ethutil.Config.Db = db - /* - nat, err := Discover() - if err != nil { - log.Printf("Can'them discover upnp: %v", err) - } - */ + nat, err := Discover() + if err != nil { + log.Printf("Can't discover upnp: %v", err) + } + log.Println(nat) nonce, _ := ethutil.RandomUint64() ethereum := &Ethereum{ @@ -69,7 +74,8 @@ func New() (*Ethereum, error) { db: db, peers: list.New(), Nonce: nonce, - //nat: nat, + serverCaps: caps, + nat: nat, } ethereum.TxPool = ethchain.NewTxPool() ethereum.TxPool.Speaker = ethereum @@ -85,13 +91,8 @@ func (s *Ethereum) AddPeer(conn net.Conn) { peer := NewPeer(conn, s, true) if peer != nil { - if s.peers.Len() > 25 { - log.Println("SEED") - peer.Start(true) - } else { - s.peers.PushBack(peer) - peer.Start(false) - } + s.peers.PushBack(peer) + peer.Start(false) } } @@ -122,7 +123,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error { return nil } - peer := NewOutboundPeer(addr, s) + peer := NewOutboundPeer(addr, s, s.serverCaps) s.peers.PushBack(peer) @@ -158,12 +159,18 @@ func (s *Ethereum) InboundPeers() []*Peer { } func (s *Ethereum) InOutPeers() []*Peer { + // Reap the dead peers first + s.reapPeers() + // Create a new peer slice with at least the length of the total peers inboundPeers := make([]*Peer, s.peers.Len()) length := 0 eachPeer(s.peers, func(p *Peer, e *list.Element) { - inboundPeers[length] = p - length++ + // Only return peers with an actual ip + if len(p.host) > 0 { + inboundPeers[length] = p + length++ + } }) return inboundPeers[:length] @@ -171,6 +178,10 @@ func (s *Ethereum) InOutPeers() []*Peer { func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) { msg := ethwire.NewMessage(msgType, data) + s.BroadcastMsg(msg) +} + +func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) { eachPeer(s.peers, func(p *Peer, e *list.Element) { p.QueueMessage(msg) }) @@ -180,15 +191,25 @@ func (s *Ethereum) Peers() *list.List { return s.peers } -func (s *Ethereum) ReapDeadPeers() { - for { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { - s.peers.Remove(e) - } - }) +func (s *Ethereum) reapPeers() { + s.peerMut.Lock() + defer s.peerMut.Unlock() + + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { + s.peers.Remove(e) + } + }) +} - time.Sleep(processReapingTimeout * time.Second) +func (s *Ethereum) ReapDeadPeerHandler() { + reapTimer := time.NewTicker(processReapingTimeout * time.Second) + + for { + select { + case <-reapTimer.C: + s.reapPeers() + } } } @@ -241,29 +262,33 @@ func (s *Ethereum) Start() { } else { s.Addr = ln.Addr() // Starting accepting connections - go func() { - log.Println("Ready and accepting connections") - - for { - conn, err := ln.Accept() - if err != nil { - log.Println(err) - - continue - } - - go s.AddPeer(conn) - } - }() + log.Println("Ready and accepting connections") + // Start the peer handler + go s.peerHandler(ln) } + go s.upnpUpdateThread() + // Start the reaping processes - go s.ReapDeadPeers() + go s.ReapDeadPeerHandler() // Start the tx pool s.TxPool.Start() } +func (s *Ethereum) peerHandler(listener net.Listener) { + for { + conn, err := listener.Accept() + if err != nil { + log.Println(err) + + continue + } + + go s.AddPeer(conn) + } +} + func (s *Ethereum) Stop() { // Close the database defer s.db.Close() |