aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore12
-rw-r--r--ethereum.go213
-rw-r--r--peer.go303
3 files changed, 528 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 000000000..f725d58d1
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+# See http://help.github.com/ignore-files/ for more about ignoring files.
+#
+# If you find yourself ignoring temporary files generated by your text editor
+# or operating system, you probably want to add a global ignore instead:
+# git config --global core.excludesfile ~/.gitignore_global
+
+/tmp
+*/**/*un~
+*un~
+.DS_Store
+*/**/.DS_Store
+
diff --git a/ethereum.go b/ethereum.go
new file mode 100644
index 000000000..b1b675c88
--- /dev/null
+++ b/ethereum.go
@@ -0,0 +1,213 @@
+package eth
+
+import (
+ "container/list"
+ "github.com/ethereum/ethchain-go"
+ "github.com/ethereum/ethdb-go"
+ "github.com/ethereum/ethutil-go"
+ "github.com/ethereum/ethwire-go"
+ "log"
+ "net"
+ "sync/atomic"
+ "time"
+)
+
+func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
+ // Loop thru the peers and close them (if we had them)
+ for e := peers.Front(); e != nil; e = e.Next() {
+ if peer, ok := e.Value.(*Peer); ok {
+ callback(peer, e)
+ }
+ }
+}
+
+const (
+ processReapingTimeout = 60 // TODO increase
+)
+
+type Ethereum struct {
+ // Channel for shutting down the ethereum
+ shutdownChan chan bool
+ // DB interface
+ //db *ethdb.LDBDatabase
+ db *ethdb.MemDatabase
+ // Block manager for processing new blocks and managing the block chain
+ BlockManager *ethchain.BlockManager
+ // The transaction pool. Transaction can be pushed on this pool
+ // for later including in the blocks
+ TxPool *ethchain.TxPool
+ // Peers (NYI)
+ peers *list.List
+ // Nonce
+ Nonce uint64
+}
+
+func New() (*Ethereum, error) {
+ //db, err := ethdb.NewLDBDatabase()
+ db, err := ethdb.NewMemDatabase()
+ if err != nil {
+ return nil, err
+ }
+
+ ethutil.Config.Db = db
+
+ nonce, _ := ethutil.RandomUint64()
+ ethereum := &Ethereum{
+ shutdownChan: make(chan bool),
+ db: db,
+ peers: list.New(),
+ Nonce: nonce,
+ }
+ ethereum.TxPool = ethchain.NewTxPool()
+ ethereum.TxPool.Speaker = ethereum
+ ethereum.BlockManager = ethchain.NewBlockManager()
+
+ ethereum.TxPool.BlockManager = ethereum.BlockManager
+ ethereum.BlockManager.TransactionPool = ethereum.TxPool
+
+ return ethereum, nil
+}
+
+func (s *Ethereum) AddPeer(conn net.Conn) {
+ peer := NewPeer(conn, s, true)
+
+ if peer != nil {
+ s.peers.PushBack(peer)
+ peer.Start()
+
+ log.Println("Peer connected ::", conn.RemoteAddr())
+ }
+}
+
+func (s *Ethereum) ProcessPeerList(addrs []string) {
+ for _, addr := range addrs {
+ // TODO Probably requires some sanity checks
+ s.ConnectToPeer(addr)
+ }
+}
+
+func (s *Ethereum) ConnectToPeer(addr string) error {
+ peer := NewOutboundPeer(addr, s)
+
+ s.peers.PushBack(peer)
+
+ return nil
+}
+
+func (s *Ethereum) OutboundPeers() []*Peer {
+ // Create a new peer slice with at least the length of the total peers
+ outboundPeers := make([]*Peer, s.peers.Len())
+ length := 0
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if !p.inbound {
+ outboundPeers[length] = p
+ length++
+ }
+ })
+
+ return outboundPeers[:length]
+}
+
+func (s *Ethereum) InboundPeers() []*Peer {
+ // Create a new peer slice with at least the length of the total peers
+ inboundPeers := make([]*Peer, s.peers.Len())
+ length := 0
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if p.inbound {
+ inboundPeers[length] = p
+ length++
+ }
+ })
+
+ return inboundPeers[:length]
+}
+
+func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []byte) {
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ p.QueueMessage(ethwire.NewMessage(msgType, data))
+ })
+}
+
+func (s *Ethereum) ReapDeadPeers() {
+ for {
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
+ log.Println("Dead peer found .. reaping")
+
+ s.peers.Remove(e)
+ }
+ })
+
+ time.Sleep(processReapingTimeout * time.Second)
+ }
+}
+
+// Start the ethereum
+func (s *Ethereum) Start() {
+ // For now this function just blocks the main thread
+ ln, err := net.Listen("tcp", ":12345")
+ if err != nil {
+ // This is mainly for testing to create a "network"
+ if ethutil.Config.Debug {
+ log.Println("Connection listening disabled. Acting as client")
+
+ err = s.ConnectToPeer("localhost:12345")
+ if err != nil {
+ log.Println("Error starting ethereum", err)
+
+ s.Stop()
+ }
+ } else {
+ log.Fatal(err)
+ }
+ } else {
+ // Starting accepting connections
+ go func() {
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ log.Println(err)
+
+ continue
+ }
+
+ go s.AddPeer(conn)
+ }
+ }()
+ }
+
+ // Start the reaping processes
+ go s.ReapDeadPeers()
+
+ // Start the tx pool
+ s.TxPool.Start()
+
+ // TMP
+ /*
+ go func() {
+ for {
+ s.Broadcast("block", s.blockManager.bc.GenesisBlock().RlpEncode())
+
+ time.Sleep(1000 * time.Millisecond)
+ }
+ }()
+ */
+}
+
+func (s *Ethereum) Stop() {
+ // Close the database
+ defer s.db.Close()
+
+ eachPeer(s.peers, func(p *Peer, e *list.Element) {
+ p.Stop()
+ })
+
+ s.shutdownChan <- true
+
+ s.TxPool.Stop()
+}
+
+// This function will wait for a shutdown and resumes main thread execution
+func (s *Ethereum) WaitForShutdown() {
+ <-s.shutdownChan
+}
diff --git a/peer.go b/peer.go
new file mode 100644
index 000000000..ef9a05ed1
--- /dev/null
+++ b/peer.go
@@ -0,0 +1,303 @@
+package eth
+
+import (
+ "github.com/ethereum/ethutil-go"
+ "github.com/ethereum/ethwire-go"
+ "log"
+ "net"
+ "strconv"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ // The size of the output buffer for writing messages
+ outputBufferSize = 50
+)
+
+type Peer struct {
+ // Ethereum interface
+ ethereum *Ethereum
+ // Net connection
+ conn net.Conn
+ // Output queue which is used to communicate and handle messages
+ outputQueue chan *ethwire.Msg
+ // Quit channel
+ quit chan bool
+ // Determines whether it's an inbound or outbound peer
+ inbound bool
+ // Flag for checking the peer's connectivity state
+ connected int32
+ disconnect int32
+ // Last known message send
+ lastSend time.Time
+ // Indicated whether a verack has been send or not
+ // This flag is used by writeMessage to check if messages are allowed
+ // to be send or not. If no version is known all messages are ignored.
+ versionKnown bool
+
+ // Last received pong message
+ lastPong int64
+ // Indicates whether a MsgGetPeersTy was requested of the peer
+ // this to prevent receiving false peers.
+ requestedPeerList bool
+}
+
+func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
+ return &Peer{
+ outputQueue: make(chan *ethwire.Msg, outputBufferSize),
+ quit: make(chan bool),
+ ethereum: ethereum,
+ conn: conn,
+ inbound: inbound,
+ disconnect: 0,
+ connected: 1,
+ }
+}
+
+func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer {
+ p := &Peer{
+ outputQueue: make(chan *ethwire.Msg, outputBufferSize),
+ quit: make(chan bool),
+ ethereum: ethereum,
+ inbound: false,
+ connected: 0,
+ disconnect: 0,
+ }
+
+ // Set up the connection in another goroutine so we don't block the main thread
+ go func() {
+ conn, err := net.Dial("tcp", addr)
+ if err != nil {
+ p.Stop()
+ }
+ p.conn = conn
+
+ // Atomically set the connection state
+ atomic.StoreInt32(&p.connected, 1)
+ atomic.StoreInt32(&p.disconnect, 0)
+
+ log.Println("Connected to peer ::", conn.RemoteAddr())
+
+ p.Start()
+ }()
+
+ return p
+}
+
+// Outputs any RLP encoded data to the peer
+func (p *Peer) QueueMessage(msg *ethwire.Msg) {
+ p.outputQueue <- msg
+}
+
+func (p *Peer) writeMessage(msg *ethwire.Msg) {
+ // Ignore the write if we're not connected
+ if atomic.LoadInt32(&p.connected) != 1 {
+ return
+ }
+
+ if !p.versionKnown {
+ switch msg.Type {
+ case ethwire.MsgHandshakeTy: // Ok
+ default: // Anything but ack is allowed
+ return
+ }
+ }
+
+ err := ethwire.WriteMessage(p.conn, msg)
+ if err != nil {
+ log.Println("Can't send message:", err)
+ // Stop the client if there was an error writing to it
+ p.Stop()
+ return
+ }
+}
+
+// Outbound message handler. Outbound messages are handled here
+func (p *Peer) HandleOutbound() {
+ // The ping timer. Makes sure that every 2 minutes a ping is send to the peer
+ tickleTimer := time.NewTicker(2 * time.Minute)
+out:
+ for {
+ select {
+ // Main message queue. All outbound messages are processed through here
+ case msg := <-p.outputQueue:
+ p.writeMessage(msg)
+
+ p.lastSend = time.Now()
+
+ case <-tickleTimer.C:
+ p.writeMessage(&ethwire.Msg{Type: ethwire.MsgPingTy})
+
+ // Break out of the for loop if a quit message is posted
+ case <-p.quit:
+ break out
+ }
+ }
+
+clean:
+ // This loop is for draining the output queue and anybody waiting for us
+ for {
+ select {
+ case <-p.outputQueue:
+ // TODO
+ default:
+ break clean
+ }
+ }
+}
+
+// Inbound handler. Inbound messages are received here and passed to the appropriate methods
+func (p *Peer) HandleInbound() {
+
+out:
+ for atomic.LoadInt32(&p.disconnect) == 0 {
+ // Wait for a message from the peer
+ msg, err := ethwire.ReadMessage(p.conn)
+ if err != nil {
+ log.Println(err)
+
+ break out
+ }
+
+ if ethutil.Config.Debug {
+ log.Printf("Received %s\n", msg.Type.String())
+ }
+
+ switch msg.Type {
+ case ethwire.MsgHandshakeTy:
+ // Version message
+ p.handleHandshake(msg)
+ case ethwire.MsgBlockTy:
+ err := p.ethereum.BlockManager.ProcessBlock(ethutil.NewBlock(msg.Data))
+ if err != nil {
+ log.Println(err)
+ }
+ case ethwire.MsgTxTy:
+ p.ethereum.TxPool.QueueTransaction(ethutil.NewTransactionFromData(msg.Data))
+ case ethwire.MsgInvTy:
+ case ethwire.MsgGetPeersTy:
+ p.requestedPeerList = true
+ // Peer asked for list of connected peers
+ p.pushPeers()
+ case ethwire.MsgPeersTy:
+ // Received a list of peers (probably because MsgGetPeersTy was send)
+ // Only act on message if we actually requested for a peers list
+ if p.requestedPeerList {
+ data := ethutil.Conv(msg.Data)
+ // Create new list of possible peers for the ethereum to process
+ peers := make([]string, data.Length())
+ // Parse each possible peer
+ for i := 0; i < data.Length(); i++ {
+ peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint()))
+ }
+
+ // Connect to the list of peers
+ p.ethereum.ProcessPeerList(peers)
+ // Mark unrequested again
+ p.requestedPeerList = false
+ }
+ case ethwire.MsgPingTy:
+ // Respond back with pong
+ p.QueueMessage(&ethwire.Msg{Type: ethwire.MsgPongTy})
+ case ethwire.MsgPongTy:
+ p.lastPong = time.Now().Unix()
+ }
+ }
+
+ p.Stop()
+}
+
+func (p *Peer) Start() {
+ if !p.inbound {
+ err := p.pushHandshake()
+ if err != nil {
+ log.Printf("Peer can't send outbound version ack", err)
+
+ p.Stop()
+ }
+ }
+
+ // Run the outbound handler in a new goroutine
+ go p.HandleOutbound()
+ // Run the inbound handler in a new goroutine
+ go p.HandleInbound()
+}
+
+func (p *Peer) Stop() {
+ if atomic.AddInt32(&p.disconnect, 1) != 1 {
+ return
+ }
+
+ close(p.quit)
+ if atomic.LoadInt32(&p.connected) != 0 {
+ p.conn.Close()
+ }
+
+ log.Println("Peer shutdown")
+}
+
+func (p *Peer) pushHandshake() error {
+ msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{
+ 1, 0, p.ethereum.Nonce,
+ }))
+
+ p.QueueMessage(msg)
+
+ return nil
+}
+
+// Pushes the list of outbound peers to the client when requested
+func (p *Peer) pushPeers() {
+ outPeers := make([]interface{}, len(p.ethereum.OutboundPeers()))
+ // Serialise each peer
+ for i, peer := range p.ethereum.OutboundPeers() {
+ outPeers[i] = peer.RlpEncode()
+ }
+
+ // Send message to the peer with the known list of connected clients
+ msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers))
+
+ p.QueueMessage(msg)
+}
+
+func (p *Peer) handleHandshake(msg *ethwire.Msg) {
+ c := ethutil.Conv(msg.Data)
+ // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
+ if c.Get(2).AsUint() == p.ethereum.Nonce {
+ //if msg.Nonce == p.ethereum.Nonce {
+ log.Println("Peer connected to self, disconnecting")
+
+ p.Stop()
+
+ return
+ }
+
+ p.versionKnown = true
+
+ // If this is an inbound connection send an ack back
+ if p.inbound {
+ err := p.pushHandshake()
+ if err != nil {
+ log.Println("Peer can't send ack back")
+
+ p.Stop()
+ }
+ }
+}
+
+func (p *Peer) RlpEncode() []byte {
+ host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String())
+ if err != nil {
+ return nil
+ }
+
+ i, err := strconv.Atoi(prt)
+ if err != nil {
+ return nil
+ }
+
+ port := ethutil.NumberToBytes(uint16(i), 16)
+
+ return ethutil.Encode([]interface{}{host, port})
+}