aboutsummaryrefslogtreecommitdiffstats
path: root/ethereum.go
diff options
context:
space:
mode:
Diffstat (limited to 'ethereum.go')
-rw-r--r--ethereum.go107
1 files changed, 66 insertions, 41 deletions
diff --git a/ethereum.go b/ethereum.go
index a7a2f6b8c..eab40e93d 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -9,6 +9,7 @@ import (
"log"
"net"
"strconv"
+ "sync"
"sync/atomic"
"time"
)
@@ -45,9 +46,14 @@ type Ethereum struct {
Addr net.Addr
nat NAT
+
+ peerMut sync.Mutex
+
+ // Capabilities for outgoing peers
+ serverCaps Caps
}
-func New() (*Ethereum, error) {
+func New(caps Caps) (*Ethereum, error) {
//db, err := ethdb.NewLDBDatabase()
db, err := ethdb.NewMemDatabase()
if err != nil {
@@ -56,12 +62,11 @@ func New() (*Ethereum, error) {
ethutil.Config.Db = db
- /*
- nat, err := Discover()
- if err != nil {
- log.Printf("Can'them discover upnp: %v", err)
- }
- */
+ nat, err := Discover()
+ if err != nil {
+ log.Printf("Can't discover upnp: %v", err)
+ }
+ log.Println(nat)
nonce, _ := ethutil.RandomUint64()
ethereum := &Ethereum{
@@ -69,7 +74,8 @@ func New() (*Ethereum, error) {
db: db,
peers: list.New(),
Nonce: nonce,
- //nat: nat,
+ serverCaps: caps,
+ nat: nat,
}
ethereum.TxPool = ethchain.NewTxPool()
ethereum.TxPool.Speaker = ethereum
@@ -85,13 +91,8 @@ func (s *Ethereum) AddPeer(conn net.Conn) {
peer := NewPeer(conn, s, true)
if peer != nil {
- if s.peers.Len() > 25 {
- log.Println("SEED")
- peer.Start(true)
- } else {
- s.peers.PushBack(peer)
- peer.Start(false)
- }
+ s.peers.PushBack(peer)
+ peer.Start(false)
}
}
@@ -122,7 +123,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
return nil
}
- peer := NewOutboundPeer(addr, s)
+ peer := NewOutboundPeer(addr, s, s.serverCaps)
s.peers.PushBack(peer)
@@ -158,12 +159,18 @@ func (s *Ethereum) InboundPeers() []*Peer {
}
func (s *Ethereum) InOutPeers() []*Peer {
+ // Reap the dead peers first
+ s.reapPeers()
+
// Create a new peer slice with at least the length of the total peers
inboundPeers := make([]*Peer, s.peers.Len())
length := 0
eachPeer(s.peers, func(p *Peer, e *list.Element) {
- inboundPeers[length] = p
- length++
+ // Only return peers with an actual ip
+ if len(p.host) > 0 {
+ inboundPeers[length] = p
+ length++
+ }
})
return inboundPeers[:length]
@@ -171,6 +178,10 @@ func (s *Ethereum) InOutPeers() []*Peer {
func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) {
msg := ethwire.NewMessage(msgType, data)
+ s.BroadcastMsg(msg)
+}
+
+func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) {
eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(msg)
})
@@ -180,15 +191,25 @@ func (s *Ethereum) Peers() *list.List {
return s.peers
}
-func (s *Ethereum) ReapDeadPeers() {
- for {
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
- s.peers.Remove(e)
- }
- })
+func (s *Ethereum) reapPeers() {
+ s.peerMut.Lock()
+ defer s.peerMut.Unlock()
+
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
+ s.peers.Remove(e)
+ }
+ })
+}
- time.Sleep(processReapingTimeout * time.Second)
+func (s *Ethereum) ReapDeadPeerHandler() {
+ reapTimer := time.NewTicker(processReapingTimeout * time.Second)
+
+ for {
+ select {
+ case <-reapTimer.C:
+ s.reapPeers()
+ }
}
}
@@ -241,29 +262,33 @@ func (s *Ethereum) Start() {
} else {
s.Addr = ln.Addr()
// Starting accepting connections
- go func() {
- log.Println("Ready and accepting connections")
-
- for {
- conn, err := ln.Accept()
- if err != nil {
- log.Println(err)
-
- continue
- }
-
- go s.AddPeer(conn)
- }
- }()
+ log.Println("Ready and accepting connections")
+ // Start the peer handler
+ go s.peerHandler(ln)
}
+ go s.upnpUpdateThread()
+
// Start the reaping processes
- go s.ReapDeadPeers()
+ go s.ReapDeadPeerHandler()
// Start the tx pool
s.TxPool.Start()
}
+func (s *Ethereum) peerHandler(listener net.Listener) {
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ log.Println(err)
+
+ continue
+ }
+
+ go s.AddPeer(conn)
+ }
+}
+
func (s *Ethereum) Stop() {
// Close the database
defer s.db.Close()