aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--peer.go64
-rw-r--r--server.go43
2 files changed, 94 insertions, 13 deletions
diff --git a/peer.go b/peer.go
index 8e6a5d540..158541028 100644
--- a/peer.go
+++ b/peer.go
@@ -5,6 +5,7 @@ import (
"github.com/ethereum/ethwire-go"
"log"
"net"
+ "strconv"
"sync/atomic"
"time"
)
@@ -37,6 +38,9 @@ type Peer struct {
// Last received pong message
lastPong int64
+ // Indicates whether a MsgGetPeersTy was requested of the peer
+ // this to prevent receiving false peers.
+ requestedPeerList bool
}
func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer {
@@ -160,8 +164,6 @@ out:
log.Printf("Received %s\n", msg.Type.String())
}
- // TODO Hash data and check if for existence (= ignore)
-
switch msg.Type {
case ethwire.MsgHandshakeTy:
// Version message
@@ -172,20 +174,34 @@ out:
log.Println(err)
}
case ethwire.MsgTxTy:
+ //p.server.blockManager.AddToTransactionPool(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data)))
case ethwire.MsgInvTy:
case ethwire.MsgGetPeersTy:
+ p.requestedPeerList = true
+ // Peer asked for list of connected peers
+ p.pushPeers()
case ethwire.MsgPeersTy:
+ // Received a list of peers (probably because MsgGetPeersTy was send)
+ // Only act on message if we actually requested for a peers list
+ if p.requestedPeerList {
+ data := ethutil.Conv(msg.Data)
+ // Create new list of possible peers for the server to process
+ peers := make([]string, data.Length())
+ // Parse each possible peer
+ for i := 0; i < data.Length(); i++ {
+ peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint()))
+ }
+
+ // Connect to the list of peers
+ p.server.ProcessPeerList(peers)
+ // Mark unrequested again
+ p.requestedPeerList = false
+ }
case ethwire.MsgPingTy:
// Respond back with pong
- p.writeMessage(&ethwire.Msg{Type: ethwire.MsgPongTy})
+ p.QueueMessage(&ethwire.Msg{Type: ethwire.MsgPongTy})
case ethwire.MsgPongTy:
p.lastPong = time.Now().Unix()
-
- /*
- case "blockmine":
- d, _ := ethutil.Decode(msg.Data, 0)
- log.Printf("block mined %s\n", d)
- */
}
}
@@ -231,6 +247,20 @@ func (p *Peer) pushHandshake() error {
return nil
}
+// Pushes the list of outbound peers to the client when requested
+func (p *Peer) pushPeers() {
+ outPeers := make([]interface{}, len(p.server.OutboundPeers()))
+ // Serialise each peer
+ for i, peer := range p.server.OutboundPeers() {
+ outPeers[i] = peer.RlpEncode()
+ }
+
+ // Send message to the peer with the known list of connected clients
+ msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers))
+
+ p.QueueMessage(msg)
+}
+
func (p *Peer) handleHandshake(msg *ethwire.Msg) {
c := ethutil.Conv(msg.Data)
// [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
@@ -255,3 +285,19 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
}
}
}
+
+func (p *Peer) RlpEncode() []byte {
+ host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String())
+ if err != nil {
+ return nil
+ }
+
+ i, err := strconv.Atoi(prt)
+ if err != nil {
+ return nil
+ }
+
+ port := ethutil.NumberToBytes(uint16(i), 16)
+
+ return ethutil.Encode([]interface{}{host, port})
+}
diff --git a/server.go b/server.go
index 9907f3b24..7a29d1bd9 100644
--- a/server.go
+++ b/server.go
@@ -20,6 +20,10 @@ func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
}
}
+const (
+ processReapingTimeout = 60 // TODO increase
+)
+
type Server struct {
// Channel for shutting down the server
shutdownChan chan bool
@@ -66,6 +70,13 @@ func (s *Server) AddPeer(conn net.Conn) {
}
}
+func (s *Server) ProcessPeerList(addrs []string) {
+ for _, addr := range addrs {
+ // TODO Probably requires some sanity checks
+ s.ConnectToPeer(addr)
+ }
+}
+
func (s *Server) ConnectToPeer(addr string) error {
peer := NewOutboundPeer(addr, s)
@@ -74,16 +85,40 @@ func (s *Server) ConnectToPeer(addr string) error {
return nil
}
+func (s *Server) OutboundPeers() []*Peer {
+ // Create a new peer slice with at least the length of the total peers
+ outboundPeers := make([]*Peer, s.peers.Len())
+ length := 0
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if !p.inbound {
+ outboundPeers[length] = p
+ length++
+ }
+ })
+
+ return outboundPeers[:length]
+}
+
+func (s *Server) InboundPeers() []*Peer {
+ // Create a new peer slice with at least the length of the total peers
+ inboundPeers := make([]*Peer, s.peers.Len())
+ length := 0
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if p.inbound {
+ inboundPeers[length] = p
+ length++
+ }
+ })
+
+ return inboundPeers[:length]
+}
+
func (s *Server) Broadcast(msgType ethwire.MsgType, data []byte) {
eachPeer(s.peers, func(p *Peer, e *list.Element) {
p.QueueMessage(ethwire.NewMessage(msgType, data))
})
}
-const (
- processReapingTimeout = 1 // TODO increase
-)
-
func (s *Server) ReapDeadPeers() {
for {
eachPeer(s.peers, func(p *Peer, e *list.Element) {