diff options
author | obscuren <geffobscura@gmail.com> | 2014-01-10 06:15:51 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-01-10 06:15:51 +0800 |
commit | 849408dda60fe32d7abb78d103b09ca0bc7b5a60 (patch) | |
tree | b066b834e8e7b2182977fc9f784c821fad47e380 | |
parent | 01740695cda7fe27c53f0fa078732fa5a15a88a5 (diff) | |
download | dexon-849408dda60fe32d7abb78d103b09ca0bc7b5a60.tar.gz dexon-849408dda60fe32d7abb78d103b09ca0bc7b5a60.tar.zst dexon-849408dda60fe32d7abb78d103b09ca0bc7b5a60.zip |
Peer handling
-rw-r--r-- | peer.go | 93 | ||||
-rw-r--r-- | server.go | 20 |
2 files changed, 111 insertions, 2 deletions
diff --git a/peer.go b/peer.go new file mode 100644 index 000000000..0c8d38772 --- /dev/null +++ b/peer.go @@ -0,0 +1,93 @@ +package main + +import ( + "net" + "errors" + "log" +) + +type InMsg struct { + msgType string // Specifies how the encoded data should be interpreted + data []byte // RLP encoded data +} + +func ReadMessage(conn net.Conn) (*InMsg, error) { + buff := make([]byte, 4069) + + // Wait for a message from this peer + n, err := conn.Read(buff) + if err != nil { + return nil, err + } else if n == 0 { + return nil, errors.New("Empty message received") + } + + // Read the header (MAX n) + decoder := NewRlpDecoder(buff[:n]) + t := decoder.Get(0).AsString() + if t == "" { + return nil, errors.New("Data contained no data type") + } + + return &InMsg{msgType: t, data: decoder.Get(1).AsBytes()}, nil +} + +type OutMsg struct { + data []byte +} + +type Peer struct { + server *Server + conn net.Conn + outputQueue chan OutMsg + quit chan bool +} + +func NewPeer(conn net.Conn, server *Server) *Peer { + return &Peer{ + outputQueue: make(chan OutMsg, 1), // Buffered chan of 1 is enough + quit: make(chan bool), + + server: server, + conn: conn, + } +} + +// Outputs any RLP encoded data to the peer +func (p *Peer) QueueMessage(data []byte) { + p.outputQueue <- OutMsg{data: data} +} + +func (p *Peer) HandleOutbound() { +out: + for { + switch { + case <- p.quit: + break out + } + } +} + +func (p *Peer) HandleInbound() { + defer p.conn.Close() + +out: + for { + msg, err := ReadMessage(p.conn) + if err != nil { + log.Println(err) + + break out + } + + log.Println(msg) + } + + // Notify the out handler we're quiting + p.quit <- true +} + +func (p *Peer) Start() { + go p.HandleOutbound() + go p.HandleInbound() +} @@ -2,7 +2,8 @@ package main import ( "container/list" - "time" + "net" + "log" ) var Db *LDBDatabase @@ -36,12 +37,27 @@ func NewServer() (*Server, error) { return server, nil } +func (s *Server) AddPeer(conn net.Conn) { + s.peers.PushBack(NewPeer(conn, s)) +} + // Start the server func (s *Server) Start() { // For now this function just blocks the main thread + ln, err := net.Listen("tcp", ":12345") + if err != nil { + log.Fatal(err) + } + go func() { for { - time.Sleep( time.Second ) + conn, err := ln.Accept() + if err != nil { + log.Println(err) + continue + } + + go s.AddPeer(conn) } }() } |