diff options
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 127 |
1 files changed, 99 insertions, 28 deletions
@@ -5,13 +5,14 @@ import ( "github.com/ethereum/ethwire-go" "log" "net" + "strconv" "sync/atomic" "time" ) const ( // The size of the output buffer for writing messages - outputBufferSize = 50 + outputBufferSize = 50 ) type Peer struct { @@ -20,13 +21,13 @@ type Peer struct { // Net connection conn net.Conn // Output queue which is used to communicate and handle messages - outputQueue chan *ethwire.InOutMsg + outputQueue chan *ethwire.Msg // Quit channel quit chan bool // Determines whether it's an inbound or outbound peer inbound bool // Flag for checking the peer's connectivity state - connected int32 + connected int32 disconnect int32 // Last known message send lastSend time.Time @@ -34,11 +35,17 @@ type Peer struct { // This flag is used by writeMessage to check if messages are allowed // to be send or not. If no version is known all messages are ignored. versionKnown bool + + // Last received pong message + lastPong int64 + // Indicates whether a MsgGetPeersTy was requested of the peer + // this to prevent receiving false peers. + requestedPeerList bool } func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { return &Peer{ - outputQueue: make(chan *ethwire.InOutMsg, outputBufferSize), + outputQueue: make(chan *ethwire.Msg, outputBufferSize), quit: make(chan bool), server: server, conn: conn, @@ -50,7 +57,7 @@ func NewPeer(conn net.Conn, server *Server, inbound bool) *Peer { func NewOutboundPeer(addr string, server *Server) *Peer { p := &Peer{ - outputQueue: make(chan *ethwire.InOutMsg, outputBufferSize), + outputQueue: make(chan *ethwire.Msg, outputBufferSize), quit: make(chan bool), server: server, inbound: false, @@ -79,19 +86,19 @@ func NewOutboundPeer(addr string, server *Server) *Peer { } // Outputs any RLP encoded data to the peer -func (p *Peer) QueueMessage(msg *ethwire.InOutMsg) { +func (p *Peer) QueueMessage(msg *ethwire.Msg) { p.outputQueue <- msg } -func (p *Peer) writeMessage(msg *ethwire.InOutMsg) { +func (p *Peer) writeMessage(msg *ethwire.Msg) { // Ignore the write if we're not connected if atomic.LoadInt32(&p.connected) != 1 { return } if !p.versionKnown { - switch msg.MsgType { - case "verack": // Ok + switch msg.Type { + case ethwire.MsgHandshakeTy: // Ok default: // Anything but ack is allowed return } @@ -108,6 +115,8 @@ func (p *Peer) writeMessage(msg *ethwire.InOutMsg) { // Outbound message handler. Outbound messages are handled here func (p *Peer) HandleOutbound() { + // The ping timer. Makes sure that every 2 minutes a ping is send to the peer + tickleTimer := time.NewTicker(2 * time.Minute) out: for { select { @@ -116,6 +125,10 @@ out: p.writeMessage(msg) p.lastSend = time.Now() + + case <-tickleTimer.C: + p.writeMessage(ðwire.Msg{Type: ethwire.MsgPingTy}) + // Break out of the for loop if a quit message is posted case <-p.quit: break out @@ -126,7 +139,7 @@ clean: // This loop is for draining the output queue and anybody waiting for us for { select { - case <- p.outputQueue: + case <-p.outputQueue: // TODO default: break clean @@ -148,23 +161,47 @@ out: } if Debug { - log.Printf("Received %s\n", msg.MsgType) + log.Printf("Received %s\n", msg.Type.String()) } - // TODO Hash data and check if for existence (= ignore) - - switch msg.MsgType { - case "verack": + switch msg.Type { + case ethwire.MsgHandshakeTy: // Version message - p.handleVersionAck(msg) - case "block": - err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(msg.Data)) + p.handleHandshake(msg) + case ethwire.MsgBlockTy: + err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(ethutil.Encode(msg.Data))) if err != nil { log.Println(err) } - case "blockmine": - d, _ := ethutil.Decode(msg.Data, 0) - log.Printf("block mined %s\n", d) + case ethwire.MsgTxTy: + //p.server.blockManager.AddToTransactionPool(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data))) + case ethwire.MsgInvTy: + case ethwire.MsgGetPeersTy: + p.requestedPeerList = true + // Peer asked for list of connected peers + p.pushPeers() + case ethwire.MsgPeersTy: + // Received a list of peers (probably because MsgGetPeersTy was send) + // Only act on message if we actually requested for a peers list + if p.requestedPeerList { + data := ethutil.Conv(msg.Data) + // Create new list of possible peers for the server to process + peers := make([]string, data.Length()) + // Parse each possible peer + for i := 0; i < data.Length(); i++ { + peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint())) + } + + // Connect to the list of peers + p.server.ProcessPeerList(peers) + // Mark unrequested again + p.requestedPeerList = false + } + case ethwire.MsgPingTy: + // Respond back with pong + p.QueueMessage(ðwire.Msg{Type: ethwire.MsgPongTy}) + case ethwire.MsgPongTy: + p.lastPong = time.Now().Unix() } } @@ -173,7 +210,7 @@ out: func (p *Peer) Start() { if !p.inbound { - err := p.pushVersionAck() + err := p.pushHandshake() if err != nil { log.Printf("Peer can't send outbound version ack", err) @@ -200,17 +237,35 @@ func (p *Peer) Stop() { log.Println("Peer shutdown") } -func (p *Peer) pushVersionAck() error { - msg := ethwire.NewMessage("verack", p.server.Nonce, []byte("01")) +func (p *Peer) pushHandshake() error { + msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{ + 1, 0, p.server.Nonce, + })) p.QueueMessage(msg) return nil } -func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) { - // Detect self connect - if msg.Nonce == p.server.Nonce { +// Pushes the list of outbound peers to the client when requested +func (p *Peer) pushPeers() { + outPeers := make([]interface{}, len(p.server.OutboundPeers())) + // Serialise each peer + for i, peer := range p.server.OutboundPeers() { + outPeers[i] = peer.RlpEncode() + } + + // Send message to the peer with the known list of connected clients + msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers)) + + p.QueueMessage(msg) +} + +func (p *Peer) handleHandshake(msg *ethwire.Msg) { + c := ethutil.Conv(msg.Data) + // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID] + if c.Get(2).AsUint() == p.server.Nonce { + //if msg.Nonce == p.server.Nonce { log.Println("Peer connected to self, disconnecting") p.Stop() @@ -222,7 +277,7 @@ func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) { // If this is an inbound connection send an ack back if p.inbound { - err := p.pushVersionAck() + err := p.pushHandshake() if err != nil { log.Println("Peer can't send ack back") @@ -230,3 +285,19 @@ func (p *Peer) handleVersionAck(msg *ethwire.InOutMsg) { } } } + +func (p *Peer) RlpEncode() []byte { + host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + if err != nil { + return nil + } + + i, err := strconv.Atoi(prt) + if err != nil { + return nil + } + + port := ethutil.NumberToBytes(uint16(i), 16) + + return ethutil.Encode([]interface{}{host, port}) +} |