aboutsummaryrefslogtreecommitdiffstats
path: root/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'peer.go')
-rw-r--r--peer.go127
1 files changed, 99 insertions, 28 deletions
diff --git a/peer.go b/peer.go
index 8f68a9bec..158541028 100644
--- a/peer.go
+++ b/peer.go
@@ -5,13 +5,14 @@ import (
"github.com/ethereum/ethwire-go"
"log"
"net"
+ "strconv"
"sync/atomic"
"time"
)
const (
// The size of the output buffer for writing messages
- outputBufferSize = 50
+ outputBufferSize = 50
)
type Peer struct {
@@ -20,13 +21,13 @@ type Peer struct {
// Net connection
conn net.Conn
// Output queue which is used to communicate and handle messages
- outputQueue chan *ethwire.InOutMsg
+ outputQueue chan *ethwire.Msg
// Quit channel
quit chan bool
// Determines whether it's an inbound or outbound peer
inbound bool
// Flag for checking the peer's connectivity state
- connected int32
+ connected int32
disconnect int32
// Last known message send
lastSend time.Time
@@ -34,11 +35,17 @@ type Peer struct {
// This flag is used by writeMessage to check if messages are allowed
// to be send or not. If no version is known all messages are ignored.
versionKnown bool
+
+ // Last received pong message
+ lastPong int64
+ // Indicates whether a MsgGetPeersTy was requested of the peer
+ // this to prevent receiving false peers.
+ requestedPeerList bool
}
func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer {
return &Peer{
- outputQueue: make(chan *ethwire.InOutMsg, outputBufferSize),
+ outputQueue: make(chan *ethwire.Msg, outputBufferSize),
quit: make(chan bool),
server: server,
conn: conn,
@@ -50,7 +57,7 @@ func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer {
func NewOutboundPeer(addr string, server *Server) *Peer {
p := &Peer{
- outputQueue: make(chan *ethwire.InOutMsg, outputBufferSize),
+ outputQueue: make(chan *ethwire.Msg, outputBufferSize),
quit: make(chan bool),
server: server,
inbound: false,
@@ -79,19 +86,19 @@ func NewOutboundPeer(addr string, server *Server) *Peer {
}
// Outputs any RLP encoded data to the peer
-func (p *Peer) QueueMessage(msg *ethwire.InOutMsg) {
+func (p *Peer) QueueMessage(msg *ethwire.Msg) {
p.outputQueue <- msg
}
-func (p *Peer) writeMessage(msg *ethwire.InOutMsg) {
+func (p *Peer) writeMessage(msg *ethwire.Msg) {
// Ignore the write if we're not connected
if atomic.LoadInt32(&p.connected) != 1 {
return
}
if !p.versionKnown {
- switch msg.MsgType {
- case "verack": // Ok
+ switch msg.Type {
+ case ethwire.MsgHandshakeTy: // Ok
default: // Anything but ack is allowed
return
}
@@ -108,6 +115,8 @@ func (p *Peer) writeMessage(msg *ethwire.InOutMsg) {
// Outbound message handler. Outbound messages are handled here
func (p *Peer) HandleOutbound() {
+ // The ping timer. Makes sure that every 2 minutes a ping is send to the peer
+ tickleTimer := time.NewTicker(2 * time.Minute)
out:
for {
select {
@@ -116,6 +125,10 @@ out:
p.writeMessage(msg)
p.lastSend = time.Now()
+
+ case <-tickleTimer.C:
+ p.writeMessage(&ethwire.Msg{Type: ethwire.MsgPingTy})
+
// Break out of the for loop if a quit message is posted
case <-p.quit:
break out
@@ -126,7 +139,7 @@ clean:
// This loop is for draining the output queue and anybody waiting for us
for {
select {
- case <- p.outputQueue:
+ case <-p.outputQueue:
// TODO
default:
break clean
@@ -148,23 +161,47 @@ out:
}
if Debug {
- log.Printf("Received %s\n", msg.MsgType)
+ log.Printf("Received %s\n", msg.Type.String())
}
- // TODO Hash data and check if for existence (= ignore)
-
- switch msg.MsgType {
- case "verack":
+ switch msg.Type {
+ case ethwire.MsgHandshakeTy:
// Version message
- p.handleVersionAck(msg)
- case "block":
- err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(msg.Data))
+ p.handleHandshake(msg)
+ case ethwire.MsgBlockTy:
+ err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(ethutil.Encode(msg.Data)))
if err != nil {
log.Println(err)
}
- case "blockmine":
- d, _ := ethutil.Decode(msg.Data, 0)
- log.Printf("block mined %s\n", d)
+ case ethwire.MsgTxTy:
+ //p.server.blockManager.AddToTransactionPool(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data)))
+ case ethwire.MsgInvTy:
+ case ethwire.MsgGetPeersTy:
+ p.requestedPeerList = true
+ // Peer asked for list of connected peers
+ p.pushPeers()
+ case ethwire.MsgPeersTy:
+ // Received a list of peers (probably because MsgGetPeersTy was send)
+ // Only act on message if we actually requested for a peers list
+ if p.requestedPeerList {
+ data := ethutil.Conv(msg.Data)
+ // Create new list of possible peers for the server to process
+ peers := make([]string, data.Length())
+ // Parse each possible peer
+ for i := 0; i < data.Length(); i++ {
+ peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint()))
+ }
+
+ // Connect to the list of peers
+ p.server.ProcessPeerList(peers)
+ // Mark unrequested again
+ p.requestedPeerList = false
+ }
+ case ethwire.MsgPingTy:
+ // Respond back with pong
+ p.QueueMessage(&ethwire.Msg{Type: ethwire.MsgPongTy})
+ case ethwire.MsgPongTy:
+ p.lastPong = time.Now().Unix()
}
}
@@ -173,7 +210,7 @@ out:
func (p *Peer) Start() {
if !p.inbound {
- err := p.pushVersionAck()
+ err := p.pushHandshake()
if err != nil {
log.Printf("Peer can't send outbound version ack", err)
@@ -200,17 +237,35 @@ func (p *Peer) Stop() {
log.Println("Peer shutdown")
}
-func (p *Peer) pushVersionAck() error {
- msg := ethwire.NewMessage("verack", p.server.Nonce, []byte("01"))
+func (p *Peer) pushHandshake() error {
+ msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{
+ 1, 0, p.server.Nonce,
+ }))
p.QueueMessage(msg)
return nil
}
-func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) {
- // Detect self connect
- if msg.Nonce == p.server.Nonce {
+// Pushes the list of outbound peers to the client when requested
+func (p *Peer) pushPeers() {
+ outPeers := make([]interface{}, len(p.server.OutboundPeers()))
+ // Serialise each peer
+ for i, peer := range p.server.OutboundPeers() {
+ outPeers[i] = peer.RlpEncode()
+ }
+
+ // Send message to the peer with the known list of connected clients
+ msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers))
+
+ p.QueueMessage(msg)
+}
+
+func (p *Peer) handleHandshake(msg *ethwire.Msg) {
+ c := ethutil.Conv(msg.Data)
+ // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
+ if c.Get(2).AsUint() == p.server.Nonce {
+ //if msg.Nonce == p.server.Nonce {
log.Println("Peer connected to self, disconnecting")
p.Stop()
@@ -222,7 +277,7 @@ func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) {
// If this is an inbound connection send an ack back
if p.inbound {
- err := p.pushVersionAck()
+ err := p.pushHandshake()
if err != nil {
log.Println("Peer can't send ack back")
@@ -230,3 +285,19 @@ func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) {
}
}
}
+
+func (p *Peer) RlpEncode() []byte {
+ host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String())
+ if err != nil {
+ return nil
+ }
+
+ i, err := strconv.Atoi(prt)
+ if err != nil {
+ return nil
+ }
+
+ port := ethutil.NumberToBytes(uint16(i), 16)
+
+ return ethutil.Encode([]interface{}{host, port})
+}