aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ethereum.go107
-rw-r--r--peer.go35
2 files changed, 91 insertions, 51 deletions
diff --git a/ethereum.go b/ethereum.go
index a7a2f6b8c..eab40e93d 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -9,6 +9,7 @@ import (
"log"
"net"
"strconv"
+ "sync"
"sync/atomic"
"time"
)
@@ -45,9 +46,14 @@ type Ethereum struct {
Addr net.Addr
nat NAT
+
+ peerMut sync.Mutex
+
+ // Capabilities for outgoing peers
+ serverCaps Caps
}
-func New() (*Ethereum, error) {
+func New(caps Caps) (*Ethereum, error) {
//db, err := ethdb.NewLDBDatabase()
db, err := ethdb.NewMemDatabase()
if err != nil {
@@ -56,12 +62,11 @@ func New() (*Ethereum, error) {
ethutil.Config.Db = db
- /*
- nat, err := Discover()
- if err != nil {
- log.Printf("Can'them discover upnp: %v", err)
- }
- */
+ nat, err := Discover()
+ if err != nil {
+ log.Printf("Can't discover upnp: %v", err)
+ }
+ log.Println(nat)
nonce, _ := ethutil.RandomUint64()
ethereum := &Ethereum{
@@ -69,7 +74,8 @@ func New() (*Ethereum, error) {
db: db,
peers: list.New(),
Nonce: nonce,
- //nat: nat,
+ serverCaps: caps,
+ nat: nat,
}
ethereum.TxPool = ethchain.NewTxPool()
ethereum.TxPool.Speaker = ethereum
@@ -85,13 +91,8 @@ func (s *Ethereum) AddPeer(conn net.Conn) {
peer := NewPeer(conn, s, true)
if peer != nil {
- if s.peers.Len() > 25 {
- log.Println("SEED")
- peer.Start(true)
- } else {
- s.peers.PushBack(peer)
- peer.Start(false)
- }
+ s.peers.PushBack(peer)
+ peer.Start(false)
}
}
@@ -122,7 +123,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
return nil
}
- peer := NewOutboundPeer(addr, s)
+ peer := NewOutboundPeer(addr, s, s.serverCaps)
s.peers.PushBack(peer)
@@ -158,12 +159,18 @@ func (s *Ethereum) InboundPeers() []*Peer {
}
func (s *Ethereum) InOutPeers() []*Peer {
+ // Reap the dead peers first
+ s.reapPeers()
+
// Create a new peer slice with at least the length of the total peers
inboundPeers := make([]*Peer, s.peers.Len())
length := 0
eachPeer(s.peers, func(p *Peer, e *list.Element) {
- inboundPeers[length] = p
- length++
+ // Only return peers with an actual ip
+ if len(p.host) > 0 {
+ inboundPeers[length] = p
+ length++
+ }
})
return inboundPeers[:length]
@@ -171,6 +178,10 @@ func (s *Ethereum) InOutPeers() []*Peer {
func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) {
msg := ethwire.NewMessage(msgType, data)
+ s.BroadcastMsg(msg)
+}
+
+func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) {
eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(msg)
})
@@ -180,15 +191,25 @@ func (s *Ethereum) Peers() *list.List {
return s.peers
}
-func (s *Ethereum) ReapDeadPeers() {
- for {
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
- s.peers.Remove(e)
- }
- })
+func (s *Ethereum) reapPeers() {
+ s.peerMut.Lock()
+ defer s.peerMut.Unlock()
+
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
+ s.peers.Remove(e)
+ }
+ })
+}
- time.Sleep(processReapingTimeout * time.Second)
+func (s *Ethereum) ReapDeadPeerHandler() {
+ reapTimer := time.NewTicker(processReapingTimeout * time.Second)
+
+ for {
+ select {
+ case <-reapTimer.C:
+ s.reapPeers()
+ }
}
}
@@ -241,29 +262,33 @@ func (s *Ethereum) Start() {
} else {
s.Addr = ln.Addr()
// Starting accepting connections
- go func() {
- log.Println("Ready and accepting connections")
-
- for {
- conn, err := ln.Accept()
- if err != nil {
- log.Println(err)
-
- continue
- }
-
- go s.AddPeer(conn)
- }
- }()
+ log.Println("Ready and accepting connections")
+ // Start the peer handler
+ go s.peerHandler(ln)
}
+ go s.upnpUpdateThread()
+
// Start the reaping processes
- go s.ReapDeadPeers()
+ go s.ReapDeadPeerHandler()
// Start the tx pool
s.TxPool.Start()
}
+func (s *Ethereum) peerHandler(listener net.Listener) {
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ log.Println(err)
+
+ continue
+ }
+
+ go s.AddPeer(conn)
+ }
+}
+
func (s *Ethereum) Stop() {
// Close the database
defer s.db.Close()
diff --git a/peer.go b/peer.go
index 5d22b545c..a715e205d 100644
--- a/peer.go
+++ b/peer.go
@@ -24,6 +24,8 @@ const (
CapDiscoveryTy = 0x01
CapTxTy = 0x02
CapChainTy = 0x04
+
+ CapDefault = CapChainTy | CapTxTy | CapDiscoveryTy
)
var capsToString = map[Caps]string{
@@ -95,7 +97,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
}
}
-func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer {
+func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
p := &Peer{
outputQueue: make(chan *ethwire.Msg, outputBufferSize),
quit: make(chan bool),
@@ -103,6 +105,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer {
inbound: false,
connected: 0,
disconnect: 0,
+ caps: caps,
}
// Set up the connection in another goroutine so we don't block the main thread
@@ -165,7 +168,8 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) {
// Outbound message handler. Outbound messages are handled here
func (p *Peer) HandleOutbound() {
// The ping timer. Makes sure that every 2 minutes a ping is send to the peer
- tickleTimer := time.NewTicker(2 * time.Minute)
+ pingTimer := time.NewTicker(2 * time.Minute)
+ serviceTimer := time.NewTicker(5 * time.Second)
out:
for {
select {
@@ -175,11 +179,20 @@ out:
p.lastSend = time.Now()
- case <-tickleTimer.C:
+ // Ping timer sends a ping to the peer each 2 minutes
+ case <-pingTimer.C:
p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, ""))
- // Break out of the for loop if a quit message is posted
+ // Service timer takes care of peer broadcasting, transaction
+ // posting or block posting
+ case <-serviceTimer.C:
+ if p.caps&CapDiscoveryTy > 0 {
+ msg := p.peersMessage()
+ p.ethereum.BroadcastMsg(msg)
+ }
+
case <-p.quit:
+ // Break out of the for loop if a quit message is posted
break out
}
}
@@ -387,7 +400,7 @@ func (p *Peer) Stop() {
func (p *Peer) pushHandshake() error {
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
- uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", CapChainTy | CapTxTy | CapDiscoveryTy, p.port,
+ uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", p.caps, p.port,
})
p.QueueMessage(msg)
@@ -395,18 +408,20 @@ func (p *Peer) pushHandshake() error {
return nil
}
-// Pushes the list of outbound peers to the client when requested
-func (p *Peer) pushPeers() {
+func (p *Peer) peersMessage() *ethwire.Msg {
outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
// Serialise each peer
for i, peer := range p.ethereum.InOutPeers() {
outPeers[i] = peer.RlpData()
}
- // Send message to the peer with the known list of connected clients
- msg := ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
+ // Return the message to the peer with the known list of connected clients
+ return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
+}
- p.QueueMessage(msg)
+// Pushes the list of outbound peers to the client when requested
+func (p *Peer) pushPeers() {
+ p.QueueMessage(p.peersMessage())
}
func (p *Peer) handleHandshake(msg *ethwire.Msg) {