diff options
-rw-r--r-- | .gitignore | 12 | ||||
-rw-r--r-- | ethereum.go | 213 | ||||
-rw-r--r-- | peer.go | 303 |
3 files changed, 528 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..f725d58d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# See http://help.github.com/ignore-files/ for more about ignoring files. +# +# If you find yourself ignoring temporary files generated by your text editor +# or operating system, you probably want to add a global ignore instead: +# git config --global core.excludesfile ~/.gitignore_global + +/tmp +*/**/*un~ +*un~ +.DS_Store +*/**/.DS_Store + diff --git a/ethereum.go b/ethereum.go new file mode 100644 index 000000000..b1b675c88 --- /dev/null +++ b/ethereum.go @@ -0,0 +1,213 @@ +package eth + +import ( + "container/list" + "github.com/ethereum/ethchain-go" + "github.com/ethereum/ethdb-go" + "github.com/ethereum/ethutil-go" + "github.com/ethereum/ethwire-go" + "log" + "net" + "sync/atomic" + "time" +) + +func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { + // Loop thru the peers and close them (if we had them) + for e := peers.Front(); e != nil; e = e.Next() { + if peer, ok := e.Value.(*Peer); ok { + callback(peer, e) + } + } +} + +const ( + processReapingTimeout = 60 // TODO increase +) + +type Ethereum struct { + // Channel for shutting down the ethereum + shutdownChan chan bool + // DB interface + //db *ethdb.LDBDatabase + db *ethdb.MemDatabase + // Block manager for processing new blocks and managing the block chain + BlockManager *ethchain.BlockManager + // The transaction pool. Transaction can be pushed on this pool + // for later including in the blocks + TxPool *ethchain.TxPool + // Peers (NYI) + peers *list.List + // Nonce + Nonce uint64 +} + +func New() (*Ethereum, error) { + //db, err := ethdb.NewLDBDatabase() + db, err := ethdb.NewMemDatabase() + if err != nil { + return nil, err + } + + ethutil.Config.Db = db + + nonce, _ := ethutil.RandomUint64() + ethereum := &Ethereum{ + shutdownChan: make(chan bool), + db: db, + peers: list.New(), + Nonce: nonce, + } + ethereum.TxPool = ethchain.NewTxPool() + ethereum.TxPool.Speaker = ethereum + ethereum.BlockManager = ethchain.NewBlockManager() + + ethereum.TxPool.BlockManager = ethereum.BlockManager + ethereum.BlockManager.TransactionPool = ethereum.TxPool + + return ethereum, nil +} + +func (s *Ethereum) AddPeer(conn net.Conn) { + peer := NewPeer(conn, s, true) + + if peer != nil { + s.peers.PushBack(peer) + peer.Start() + + log.Println("Peer connected ::", conn.RemoteAddr()) + } +} + +func (s *Ethereum) ProcessPeerList(addrs []string) { + for _, addr := range addrs { + // TODO Probably requires some sanity checks + s.ConnectToPeer(addr) + } +} + +func (s *Ethereum) ConnectToPeer(addr string) error { + peer := NewOutboundPeer(addr, s) + + s.peers.PushBack(peer) + + return nil +} + +func (s *Ethereum) OutboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + outboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if !p.inbound { + outboundPeers[length] = p + length++ + } + }) + + return outboundPeers[:length] +} + +func (s *Ethereum) InboundPeers() []*Peer { + // Create a new peer slice with at least the length of the total peers + inboundPeers := make([]*Peer, s.peers.Len()) + length := 0 + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if p.inbound { + inboundPeers[length] = p + length++ + } + }) + + return inboundPeers[:length] +} + +func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []byte) { + eachPeer(s.peers, func(p *Peer, e *list.Element) { + p.QueueMessage(ethwire.NewMessage(msgType, data)) + }) +} + +func (s *Ethereum) ReapDeadPeers() { + for { + eachPeer(s.peers, func(p *Peer, e *list.Element) { + if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { + log.Println("Dead peer found .. reaping") + + s.peers.Remove(e) + } + }) + + time.Sleep(processReapingTimeout * time.Second) + } +} + +// Start the ethereum +func (s *Ethereum) Start() { + // For now this function just blocks the main thread + ln, err := net.Listen("tcp", ":12345") + if err != nil { + // This is mainly for testing to create a "network" + if ethutil.Config.Debug { + log.Println("Connection listening disabled. Acting as client") + + err = s.ConnectToPeer("localhost:12345") + if err != nil { + log.Println("Error starting ethereum", err) + + s.Stop() + } + } else { + log.Fatal(err) + } + } else { + // Starting accepting connections + go func() { + for { + conn, err := ln.Accept() + if err != nil { + log.Println(err) + + continue + } + + go s.AddPeer(conn) + } + }() + } + + // Start the reaping processes + go s.ReapDeadPeers() + + // Start the tx pool + s.TxPool.Start() + + // TMP + /* + go func() { + for { + s.Broadcast("block", s.blockManager.bc.GenesisBlock().RlpEncode()) + + time.Sleep(1000 * time.Millisecond) + } + }() + */ +} + +func (s *Ethereum) Stop() { + // Close the database + defer s.db.Close() + + eachPeer(s.peers, func(p *Peer, e *list.Element) { + p.Stop() + }) + + s.shutdownChan <- true + + s.TxPool.Stop() +} + +// This function will wait for a shutdown and resumes main thread execution +func (s *Ethereum) WaitForShutdown() { + <-s.shutdownChan +} diff --git a/peer.go b/peer.go new file mode 100644 index 000000000..ef9a05ed1 --- /dev/null +++ b/peer.go @@ -0,0 +1,303 @@ +package eth + +import ( + "github.com/ethereum/ethutil-go" + "github.com/ethereum/ethwire-go" + "log" + "net" + "strconv" + "sync/atomic" + "time" +) + +const ( + // The size of the output buffer for writing messages + outputBufferSize = 50 +) + +type Peer struct { + // Ethereum interface + ethereum *Ethereum + // Net connection + conn net.Conn + // Output queue which is used to communicate and handle messages + outputQueue chan *ethwire.Msg + // Quit channel + quit chan bool + // Determines whether it's an inbound or outbound peer + inbound bool + // Flag for checking the peer's connectivity state + connected int32 + disconnect int32 + // Last known message send + lastSend time.Time + // Indicated whether a verack has been send or not + // This flag is used by writeMessage to check if messages are allowed + // to be send or not. If no version is known all messages are ignored. + versionKnown bool + + // Last received pong message + lastPong int64 + // Indicates whether a MsgGetPeersTy was requested of the peer + // this to prevent receiving false peers. + requestedPeerList bool +} + +func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { + return &Peer{ + outputQueue: make(chan *ethwire.Msg, outputBufferSize), + quit: make(chan bool), + ethereum: ethereum, + conn: conn, + inbound: inbound, + disconnect: 0, + connected: 1, + } +} + +func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer { + p := &Peer{ + outputQueue: make(chan *ethwire.Msg, outputBufferSize), + quit: make(chan bool), + ethereum: ethereum, + inbound: false, + connected: 0, + disconnect: 0, + } + + // Set up the connection in another goroutine so we don't block the main thread + go func() { + conn, err := net.Dial("tcp", addr) + if err != nil { + p.Stop() + } + p.conn = conn + + // Atomically set the connection state + atomic.StoreInt32(&p.connected, 1) + atomic.StoreInt32(&p.disconnect, 0) + + log.Println("Connected to peer ::", conn.RemoteAddr()) + + p.Start() + }() + + return p +} + +// Outputs any RLP encoded data to the peer +func (p *Peer) QueueMessage(msg *ethwire.Msg) { + p.outputQueue <- msg +} + +func (p *Peer) writeMessage(msg *ethwire.Msg) { + // Ignore the write if we're not connected + if atomic.LoadInt32(&p.connected) != 1 { + return + } + + if !p.versionKnown { + switch msg.Type { + case ethwire.MsgHandshakeTy: // Ok + default: // Anything but ack is allowed + return + } + } + + err := ethwire.WriteMessage(p.conn, msg) + if err != nil { + log.Println("Can't send message:", err) + // Stop the client if there was an error writing to it + p.Stop() + return + } +} + +// Outbound message handler. Outbound messages are handled here +func (p *Peer) HandleOutbound() { + // The ping timer. Makes sure that every 2 minutes a ping is send to the peer + tickleTimer := time.NewTicker(2 * time.Minute) +out: + for { + select { + // Main message queue. All outbound messages are processed through here + case msg := <-p.outputQueue: + p.writeMessage(msg) + + p.lastSend = time.Now() + + case <-tickleTimer.C: + p.writeMessage(ðwire.Msg{Type: ethwire.MsgPingTy}) + + // Break out of the for loop if a quit message is posted + case <-p.quit: + break out + } + } + +clean: + // This loop is for draining the output queue and anybody waiting for us + for { + select { + case <-p.outputQueue: + // TODO + default: + break clean + } + } +} + +// Inbound handler. Inbound messages are received here and passed to the appropriate methods +func (p *Peer) HandleInbound() { + +out: + for atomic.LoadInt32(&p.disconnect) == 0 { + // Wait for a message from the peer + msg, err := ethwire.ReadMessage(p.conn) + if err != nil { + log.Println(err) + + break out + } + + if ethutil.Config.Debug { + log.Printf("Received %s\n", msg.Type.String()) + } + + switch msg.Type { + case ethwire.MsgHandshakeTy: + // Version message + p.handleHandshake(msg) + case ethwire.MsgBlockTy: + err := p.ethereum.BlockManager.ProcessBlock(ethutil.NewBlock(msg.Data)) + if err != nil { + log.Println(err) + } + case ethwire.MsgTxTy: + p.ethereum.TxPool.QueueTransaction(ethutil.NewTransactionFromData(msg.Data)) + case ethwire.MsgInvTy: + case ethwire.MsgGetPeersTy: + p.requestedPeerList = true + // Peer asked for list of connected peers + p.pushPeers() + case ethwire.MsgPeersTy: + // Received a list of peers (probably because MsgGetPeersTy was send) + // Only act on message if we actually requested for a peers list + if p.requestedPeerList { + data := ethutil.Conv(msg.Data) + // Create new list of possible peers for the ethereum to process + peers := make([]string, data.Length()) + // Parse each possible peer + for i := 0; i < data.Length(); i++ { + peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint())) + } + + // Connect to the list of peers + p.ethereum.ProcessPeerList(peers) + // Mark unrequested again + p.requestedPeerList = false + } + case ethwire.MsgPingTy: + // Respond back with pong + p.QueueMessage(ðwire.Msg{Type: ethwire.MsgPongTy}) + case ethwire.MsgPongTy: + p.lastPong = time.Now().Unix() + } + } + + p.Stop() +} + +func (p *Peer) Start() { + if !p.inbound { + err := p.pushHandshake() + if err != nil { + log.Printf("Peer can't send outbound version ack", err) + + p.Stop() + } + } + + // Run the outbound handler in a new goroutine + go p.HandleOutbound() + // Run the inbound handler in a new goroutine + go p.HandleInbound() +} + +func (p *Peer) Stop() { + if atomic.AddInt32(&p.disconnect, 1) != 1 { + return + } + + close(p.quit) + if atomic.LoadInt32(&p.connected) != 0 { + p.conn.Close() + } + + log.Println("Peer shutdown") +} + +func (p *Peer) pushHandshake() error { + msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{ + 1, 0, p.ethereum.Nonce, + })) + + p.QueueMessage(msg) + + return nil +} + +// Pushes the list of outbound peers to the client when requested +func (p *Peer) pushPeers() { + outPeers := make([]interface{}, len(p.ethereum.OutboundPeers())) + // Serialise each peer + for i, peer := range p.ethereum.OutboundPeers() { + outPeers[i] = peer.RlpEncode() + } + + // Send message to the peer with the known list of connected clients + msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers)) + + p.QueueMessage(msg) +} + +func (p *Peer) handleHandshake(msg *ethwire.Msg) { + c := ethutil.Conv(msg.Data) + // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID] + if c.Get(2).AsUint() == p.ethereum.Nonce { + //if msg.Nonce == p.ethereum.Nonce { + log.Println("Peer connected to self, disconnecting") + + p.Stop() + + return + } + + p.versionKnown = true + + // If this is an inbound connection send an ack back + if p.inbound { + err := p.pushHandshake() + if err != nil { + log.Println("Peer can't send ack back") + + p.Stop() + } + } +} + +func (p *Peer) RlpEncode() []byte { + host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + if err != nil { + return nil + } + + i, err := strconv.Atoi(prt) + if err != nil { + return nil + } + + port := ethutil.NumberToBytes(uint16(i), 16) + + return ethutil.Encode([]interface{}{host, port}) +} |