From 4914a78c8c650d7fc74570f25a682598aaeb6973 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 31 Oct 2014 14:53:42 +0100 Subject: ethwire => wire --- block_pool.go | 6 +- chain/state_manager.go | 6 +- chain/transaction_pool.go | 4 +- cmd/mist/gui.go | 6 +- cmd/utils/cmd.go | 8 +- ethereum.go | 14 +-- ethminer/miner.go | 217 ---------------------------------------- ethwire/.gitignore | 12 --- ethwire/README.md | 36 ------- ethwire/client_identity.go | 56 ----------- ethwire/client_identity_test.go | 30 ------ ethwire/messages2.go | 199 ------------------------------------ ethwire/messaging.go | 179 --------------------------------- peer.go | 90 ++++++++--------- wire/.gitignore | 12 +++ wire/README.md | 36 +++++++ wire/client_identity.go | 56 +++++++++++ wire/client_identity_test.go | 30 ++++++ wire/messages2.go | 199 ++++++++++++++++++++++++++++++++++++ wire/messaging.go | 179 +++++++++++++++++++++++++++++++++ 20 files changed, 579 insertions(+), 796 deletions(-) delete mode 100644 ethminer/miner.go delete mode 100644 ethwire/.gitignore delete mode 100644 ethwire/README.md delete mode 100644 ethwire/client_identity.go delete mode 100644 ethwire/client_identity_test.go delete mode 100644 ethwire/messages2.go delete mode 100644 ethwire/messaging.go create mode 100644 wire/.gitignore create mode 100644 wire/README.md create mode 100644 wire/client_identity.go create mode 100644 wire/client_identity_test.go create mode 100644 wire/messages2.go create mode 100644 wire/messaging.go diff --git a/block_pool.go b/block_pool.go index 1cf3ab907..0e182623f 100644 --- a/block_pool.go +++ b/block_pool.go @@ -11,8 +11,8 @@ import ( "github.com/ethereum/go-ethereum/chain" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/wire" ) var poollogger = logger.NewLogger("BPOOL") @@ -103,7 +103,7 @@ func (self *BlockPool) FetchHashes(peer *Peer) bool { const amount = 256 peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) + peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) } return true @@ -150,7 +150,7 @@ func (self *BlockPool) addBlock(b *chain.Block, peer *Peer, newBlock bool) { fmt.Println("3.", !self.fetchingHashes) if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) + peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) } } } else if self.pool[hash] != nil { diff --git a/chain/state_manager.go b/chain/state_manager.go index b6bfbc22f..f624f0097 100644 --- a/chain/state_manager.go +++ b/chain/state_manager.go @@ -11,10 +11,10 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/state" + "github.com/ethereum/go-ethereum/wire" ) var statelogger = logger.NewLogger("BLOCK") @@ -35,13 +35,13 @@ type EthManager interface { StateManager() *StateManager ChainManager() *ChainManager TxPool() *TxPool - Broadcast(msgType ethwire.MsgType, data []interface{}) + Broadcast(msgType wire.MsgType, data []interface{}) PeerCount() int IsMining() bool IsListening() bool Peers() *list.List KeyManager() *crypto.KeyManager - ClientIdentity() ethwire.ClientIdentity + ClientIdentity() wire.ClientIdentity Db() ethutil.Database EventMux() *event.TypeMux } diff --git a/chain/transaction_pool.go b/chain/transaction_pool.go index 3e3787eed..a7c85e802 100644 --- a/chain/transaction_pool.go +++ b/chain/transaction_pool.go @@ -7,9 +7,9 @@ import ( "math/big" "sync" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/state" + "github.com/ethereum/go-ethereum/wire" ) var txplogger = logger.NewLogger("TXP") @@ -93,7 +93,7 @@ func (pool *TxPool) addTransaction(tx *Transaction) { pool.pool.PushBack(tx) // Broadcast the transaction to the rest of the peers - pool.Ethereum.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()}) + pool.Ethereum.Broadcast(wire.MsgTxTy, []interface{}{tx.RlpData()}) } func (pool *TxPool) ValidateTransaction(tx *Transaction) error { diff --git a/cmd/mist/gui.go b/cmd/mist/gui.go index 2dfdd104f..d309e0a9b 100644 --- a/cmd/mist/gui.go +++ b/cmd/mist/gui.go @@ -35,8 +35,8 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethminer" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/wire" "github.com/ethereum/go-ethereum/xeth" "gopkg.in/qml.v1" ) @@ -87,7 +87,7 @@ type Gui struct { pipe *xeth.JSXEth Session string - clientIdentity *ethwire.SimpleClientIdentity + clientIdentity *wire.SimpleClientIdentity config *ethutil.ConfigManager plugins map[string]plugin @@ -97,7 +97,7 @@ type Gui struct { } // Create GUI, but doesn't start it -func NewWindow(ethereum *eth.Ethereum, config *ethutil.ConfigManager, clientIdentity *ethwire.SimpleClientIdentity, session string, logLevel int) *Gui { +func NewWindow(ethereum *eth.Ethereum, config *ethutil.ConfigManager, clientIdentity *wire.SimpleClientIdentity, session string, logLevel int) *Gui { db, err := ethdb.NewLDBDatabase("tx_database") if err != nil { panic(err) diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index d9914a6f4..5313b8fad 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -18,9 +18,9 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethminer" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/wire" "github.com/ethereum/go-ethereum/xeth" ) @@ -144,12 +144,12 @@ func NewDatabase() ethutil.Database { return db } -func NewClientIdentity(clientIdentifier, version, customIdentifier string) *ethwire.SimpleClientIdentity { +func NewClientIdentity(clientIdentifier, version, customIdentifier string) *wire.SimpleClientIdentity { clilogger.Infoln("identity created") - return ethwire.NewSimpleClientIdentity(clientIdentifier, version, customIdentifier) + return wire.NewSimpleClientIdentity(clientIdentifier, version, customIdentifier) } -func NewEthereum(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *crypto.KeyManager, usePnp bool, OutboundPort string, MaxPeer int) *eth.Ethereum { +func NewEthereum(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, usePnp bool, OutboundPort string, MaxPeer int) *eth.Ethereum { ethereum, err := eth.New(db, clientIdentity, keyManager, eth.CapDefault, usePnp) if err != nil { clilogger.Fatalln("eth start err:", err) diff --git a/ethereum.go b/ethereum.go index d6f664349..d4abeed26 100644 --- a/ethereum.go +++ b/ethereum.go @@ -17,11 +17,11 @@ import ( "github.com/ethereum/go-ethereum/chain" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/state" + "github.com/ethereum/go-ethereum/wire" ) const ( @@ -88,7 +88,7 @@ type Ethereum struct { keyManager *crypto.KeyManager - clientIdentity ethwire.ClientIdentity + clientIdentity wire.ClientIdentity isUpToDate bool @@ -97,7 +97,7 @@ type Ethereum struct { filters map[int]*chain.Filter } -func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { +func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { var err error var nat NAT @@ -142,7 +142,7 @@ func (s *Ethereum) KeyManager() *crypto.KeyManager { return s.keyManager } -func (s *Ethereum) ClientIdentity() ethwire.ClientIdentity { +func (s *Ethereum) ClientIdentity() wire.ClientIdentity { return s.clientIdentity } @@ -338,12 +338,12 @@ func (s *Ethereum) InOutPeers() []*Peer { return inboundPeers[:length] } -func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data []interface{}) { - msg := ethwire.NewMessage(msgType, data) +func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) { + msg := wire.NewMessage(msgType, data) s.BroadcastMsg(msg) } -func (s *Ethereum) BroadcastMsg(msg *ethwire.Msg) { +func (s *Ethereum) BroadcastMsg(msg *wire.Msg) { eachPeer(s.peers, func(p *Peer, e *list.Element) { p.QueueMessage(msg) }) diff --git a/ethminer/miner.go b/ethminer/miner.go deleted file mode 100644 index 42c989eee..000000000 --- a/ethminer/miner.go +++ /dev/null @@ -1,217 +0,0 @@ -package ethminer - -import ( - "bytes" - "sort" - - "github.com/ethereum/go-ethereum/chain" - "github.com/ethereum/go-ethereum/ethwire" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/logger" -) - -var minerlogger = logger.NewLogger("MINER") - -type Miner struct { - pow chain.PoW - ethereum chain.EthManager - coinbase []byte - txs chain.Transactions - uncles []*chain.Block - block *chain.Block - - events event.Subscription - powQuitChan chan struct{} - powDone chan struct{} - - turbo bool -} - -const ( - Started = iota - Stopped -) - -type Event struct { - Type int // Started || Stopped - Miner *Miner -} - -func (self *Miner) GetPow() chain.PoW { - return self.pow -} - -func NewDefaultMiner(coinbase []byte, ethereum chain.EthManager) *Miner { - miner := Miner{ - pow: &chain.EasyPow{}, - ethereum: ethereum, - coinbase: coinbase, - } - - return &miner -} - -func (self *Miner) ToggleTurbo() { - self.turbo = !self.turbo - - self.pow.Turbo(self.turbo) -} - -func (miner *Miner) Start() { - - // Insert initial TXs in our little miner 'pool' - miner.txs = miner.ethereum.TxPool().Flush() - miner.block = miner.ethereum.ChainManager().NewBlock(miner.coinbase) - - mux := miner.ethereum.EventMux() - miner.events = mux.Subscribe(chain.NewBlockEvent{}, chain.TxPreEvent{}) - - // Prepare inital block - //miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State()) - go miner.listener() - - minerlogger.Infoln("Started") - mux.Post(Event{Started, miner}) -} - -func (miner *Miner) Stop() { - minerlogger.Infoln("Stopping...") - miner.events.Unsubscribe() - miner.ethereum.EventMux().Post(Event{Stopped, miner}) -} - -func (miner *Miner) listener() { - miner.startMining() - - for { - select { - case event := <-miner.events.Chan(): - switch event := event.(type) { - case chain.NewBlockEvent: - miner.stopMining() - - block := event.Block - //minerlogger.Infoln("Got new block via Reactor") - if bytes.Compare(miner.ethereum.ChainManager().CurrentBlock.Hash(), block.Hash()) == 0 { - // TODO: Perhaps continue mining to get some uncle rewards - //minerlogger.Infoln("New top block found resetting state") - - // Filter out which Transactions we have that were not in this block - var newtxs []*chain.Transaction - for _, tx := range miner.txs { - found := false - for _, othertx := range block.Transactions() { - if bytes.Compare(tx.Hash(), othertx.Hash()) == 0 { - found = true - } - } - if found == false { - newtxs = append(newtxs, tx) - } - } - miner.txs = newtxs - } else { - if bytes.Compare(block.PrevHash, miner.ethereum.ChainManager().CurrentBlock.PrevHash) == 0 { - minerlogger.Infoln("Adding uncle block") - miner.uncles = append(miner.uncles, block) - } - } - miner.startMining() - - case chain.TxPreEvent: - miner.stopMining() - - found := false - for _, ctx := range miner.txs { - if found = bytes.Compare(ctx.Hash(), event.Tx.Hash()) == 0; found { - break - } - - miner.startMining() - } - if found == false { - // Undo all previous commits - miner.block.Undo() - // Apply new transactions - miner.txs = append(miner.txs, event.Tx) - } - } - - case <-miner.powDone: - miner.startMining() - } - } -} - -func (miner *Miner) startMining() { - if miner.powDone == nil { - miner.powDone = make(chan struct{}) - } - miner.powQuitChan = make(chan struct{}) - go miner.mineNewBlock() -} - -func (miner *Miner) stopMining() { - println("stop mining") - _, isopen := <-miner.powQuitChan - if isopen { - close(miner.powQuitChan) - } - //<-miner.powDone -} - -func (self *Miner) mineNewBlock() { - stateManager := self.ethereum.StateManager() - - self.block = self.ethereum.ChainManager().NewBlock(self.coinbase) - - // Apply uncles - if len(self.uncles) > 0 { - self.block.SetUncles(self.uncles) - } - - // Sort the transactions by nonce in case of odd network propagation - sort.Sort(chain.TxByNonce{self.txs}) - - // Accumulate all valid transactions and apply them to the new state - // Error may be ignored. It's not important during mining - parent := self.ethereum.ChainManager().GetBlock(self.block.PrevHash) - coinbase := self.block.State().GetOrNewStateObject(self.block.Coinbase) - coinbase.SetGasPool(self.block.CalcGasLimit(parent)) - receipts, txs, unhandledTxs, erroneous, err := stateManager.ProcessTransactions(coinbase, self.block.State(), self.block, self.block, self.txs) - if err != nil { - minerlogger.Debugln(err) - } - self.ethereum.TxPool().RemoveSet(erroneous) - self.txs = append(txs, unhandledTxs...) - - self.block.SetTransactions(txs) - self.block.SetReceipts(receipts) - - // Accumulate the rewards included for this block - stateManager.AccumelateRewards(self.block.State(), self.block, parent) - - self.block.State().Update() - - minerlogger.Infof("Mining on block. Includes %v transactions", len(self.txs)) - - // Find a valid nonce - nonce := self.pow.Search(self.block, self.powQuitChan) - if nonce != nil { - self.block.Nonce = nonce - err := self.ethereum.StateManager().Process(self.block) - if err != nil { - minerlogger.Infoln(err) - } else { - self.ethereum.Broadcast(ethwire.MsgBlockTy, []interface{}{self.block.Value().Val}) - minerlogger.Infof("🔨 Mined block %x\n", self.block.Hash()) - minerlogger.Infoln(self.block) - // Gather the new batch of transactions currently in the tx pool - self.txs = self.ethereum.TxPool().CurrentTransactions() - self.ethereum.EventMux().Post(chain.NewBlockEvent{self.block}) - } - - // Continue mining on the next block - self.startMining() - } -} diff --git a/ethwire/.gitignore b/ethwire/.gitignore deleted file mode 100644 index f725d58d1..000000000 --- a/ethwire/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -# See http://help.github.com/ignore-files/ for more about ignoring files. -# -# If you find yourself ignoring temporary files generated by your text editor -# or operating system, you probably want to add a global ignore instead: -# git config --global core.excludesfile ~/.gitignore_global - -/tmp -*/**/*un~ -*un~ -.DS_Store -*/**/.DS_Store - diff --git a/ethwire/README.md b/ethwire/README.md deleted file mode 100644 index 7f63688b3..000000000 --- a/ethwire/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# ethwire - -The ethwire package contains the ethereum wire protocol. The ethwire -package is required to write and read from the ethereum network. - -# Installation - -`go get github.com/ethereum/ethwire-go` - -# Messaging overview - -The Ethereum Wire protocol defines the communication between the nodes -running Ethereum. Further reader reading can be done on the -[Wiki](http://wiki.ethereum.org/index.php/Wire_Protocol). - -# Reading Messages - -```go -// Read and validate the next eth message from the provided connection. -// returns a error message with the details. -msg, err := ethwire.ReadMessage(conn) -if err != nil { - // Handle error -} -``` - -# Writing Messages - -```go -// Constructs a message which can be interpreted by the eth network. -// Write the inventory to network -err := ethwire.WriteMessage(conn, &Msg{ - Type: ethwire.MsgInvTy, - Data : []interface{}{...}, -}) -``` diff --git a/ethwire/client_identity.go b/ethwire/client_identity.go deleted file mode 100644 index ceaa9fe83..000000000 --- a/ethwire/client_identity.go +++ /dev/null @@ -1,56 +0,0 @@ -package ethwire - -import ( - "fmt" - "runtime" -) - -// should be used in Peer handleHandshake, incorporate Caps, ProtocolVersion, Pubkey etc. -type ClientIdentity interface { - String() string -} - -type SimpleClientIdentity struct { - clientIdentifier string - version string - customIdentifier string - os string - implementation string -} - -func NewSimpleClientIdentity(clientIdentifier string, version string, customIdentifier string) *SimpleClientIdentity { - clientIdentity := &SimpleClientIdentity{ - clientIdentifier: clientIdentifier, - version: version, - customIdentifier: customIdentifier, - os: runtime.GOOS, - implementation: runtime.Version(), - } - - return clientIdentity -} - -func (c *SimpleClientIdentity) init() { -} - -func (c *SimpleClientIdentity) String() string { - var id string - if len(c.customIdentifier) > 0 { - id = "/" + c.customIdentifier - } - - return fmt.Sprintf("%s/v%s%s/%s/%s", - c.clientIdentifier, - c.version, - id, - c.os, - c.implementation) -} - -func (c *SimpleClientIdentity) SetCustomIdentifier(customIdentifier string) { - c.customIdentifier = customIdentifier -} - -func (c *SimpleClientIdentity) GetCustomIdentifier() string { - return c.customIdentifier -} diff --git a/ethwire/client_identity_test.go b/ethwire/client_identity_test.go deleted file mode 100644 index 1724fe57b..000000000 --- a/ethwire/client_identity_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package ethwire - -import ( - "fmt" - "runtime" - "testing" -) - -func TestClientIdentity(t *testing.T) { - clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test") - clientString := clientIdentity.String() - expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version()) - if clientString != expected { - t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) - } - customIdentifier := clientIdentity.GetCustomIdentifier() - if customIdentifier != "test" { - t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test', got %q", customIdentifier) - } - clientIdentity.SetCustomIdentifier("test2") - customIdentifier = clientIdentity.GetCustomIdentifier() - if customIdentifier != "test2" { - t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test2', got %q", customIdentifier) - } - clientString = clientIdentity.String() - expected = fmt.Sprintf("Ethereum(G)/v0.5.16/test2/%s/%s", runtime.GOOS, runtime.Version()) - if clientString != expected { - t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) - } -} diff --git a/ethwire/messages2.go b/ethwire/messages2.go deleted file mode 100644 index ebbc7c912..000000000 --- a/ethwire/messages2.go +++ /dev/null @@ -1,199 +0,0 @@ -package ethwire - -import ( - "bytes" - "errors" - "fmt" - "net" - "time" - - "github.com/ethereum/go-ethereum/ethutil" -) - -// The connection object allows you to set up a connection to the Ethereum network. -// The Connection object takes care of all encoding and sending objects properly over -// the network. -type Connection struct { - conn net.Conn - nTimeout time.Duration - pendingMessages Messages -} - -// Create a new connection to the Ethereum network -func New(conn net.Conn) *Connection { - return &Connection{conn: conn, nTimeout: 500} -} - -// Read, reads from the network. It will block until the next message is received. -func (self *Connection) Read() *Msg { - if len(self.pendingMessages) == 0 { - self.readMessages() - } - - ret := self.pendingMessages[0] - self.pendingMessages = self.pendingMessages[1:] - - return ret - -} - -// Write to the Ethereum network specifying the type of the message and -// the data. Data can be of type RlpEncodable or []interface{}. Returns -// nil or if something went wrong an error. -func (self *Connection) Write(typ MsgType, v ...interface{}) error { - var pack []byte - - slice := [][]interface{}{[]interface{}{byte(typ)}} - for _, value := range v { - if encodable, ok := value.(ethutil.RlpEncodeDecode); ok { - slice = append(slice, encodable.RlpValue()) - } else if raw, ok := value.([]interface{}); ok { - slice = append(slice, raw) - } else { - panic(fmt.Sprintf("Unable to 'write' object of type %T", value)) - } - } - - // Encode the type and the (RLP encoded) data for sending over the wire - encoded := ethutil.NewValue(slice).Encode() - payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) - - // Write magic token and payload length (first 8 bytes) - pack = append(MagicToken, payloadLength...) - pack = append(pack, encoded...) - - // Write to the connection - _, err := self.conn.Write(pack) - if err != nil { - return err - } - - return nil -} - -func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - if len(data) == 0 { - return nil, nil, true, nil - } - - if len(data) <= 8 { - return nil, remaining, false, errors.New("Invalid message") - } - - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, data[:4]) != 0 { - return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) - } - - messageLength := ethutil.BytesToNumber(data[4:8]) - remaining = data[8+messageLength:] - if int(messageLength) > len(data[8:]) { - return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) - } - - message := data[8 : 8+messageLength] - decoder := ethutil.NewValueFromBytes(message) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msg = &Msg{ - Type: MsgType(t), - Data: d, - } - - return -} - -// The basic message reader waits for data on the given connection, decoding -// and doing a few sanity checks such as if there's a data type and -// unmarhals the given data -func (self *Connection) readMessages() (err error) { - // The recovering function in case anything goes horribly wrong - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("ethwire.ReadMessage error: %v", r) - } - }() - - // Buff for writing network message to - //buff := make([]byte, 1440) - var buff []byte - var totalBytes int - for { - // Give buffering some time - self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond)) - // Create a new temporarily buffer - b := make([]byte, 1440) - // Wait for a message from this peer - n, _ := self.conn.Read(b) - if err != nil && n == 0 { - if err.Error() != "EOF" { - fmt.Println("err now", err) - return err - } else { - break - } - - // Messages can't be empty - } else if n == 0 { - break - } - - buff = append(buff, b[:n]...) - totalBytes += n - } - - // Reslice buffer - buff = buff[:totalBytes] - msg, remaining, done, err := self.readMessage(buff) - for ; done != true; msg, remaining, done, err = self.readMessage(remaining) { - //log.Println("rx", msg) - - if msg != nil { - self.pendingMessages = append(self.pendingMessages, msg) - } - } - - return -} - -func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - if len(data) == 0 { - return nil, nil, true, nil - } - - if len(data) <= 8 { - return nil, remaining, false, errors.New("Invalid message") - } - - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, data[:4]) != 0 { - return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) - } - - messageLength := ethutil.BytesToNumber(data[4:8]) - remaining = data[8+messageLength:] - if int(messageLength) > len(data[8:]) { - return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) - } - - message := data[8 : 8+messageLength] - decoder := ethutil.NewValueFromBytes(message) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msg = &Msg{ - Type: MsgType(t), - Data: d, - } - - return -} - -func bufferedRead(conn net.Conn) ([]byte, error) { - return nil, nil -} diff --git a/ethwire/messaging.go b/ethwire/messaging.go deleted file mode 100644 index f1757f40f..000000000 --- a/ethwire/messaging.go +++ /dev/null @@ -1,179 +0,0 @@ -// Package ethwire provides low level access to the Ethereum network and allows -// you to broadcast data over the network. -package ethwire - -import ( - "bytes" - "fmt" - "net" - "time" - - "github.com/ethereum/go-ethereum/ethutil" -) - -// Connection interface describing the methods required to implement the wire protocol. -type Conn interface { - Write(typ MsgType, v ...interface{}) error - Read() *Msg -} - -// The magic token which should be the first 4 bytes of every message and can be used as separator between messages. -var MagicToken = []byte{34, 64, 8, 145} - -type MsgType byte - -const ( - // Values are given explicitly instead of by iota because these values are - // defined by the wire protocol spec; it is easier for humans to ensure - // correctness when values are explicit. - MsgHandshakeTy = 0x00 - MsgDiscTy = 0x01 - MsgPingTy = 0x02 - MsgPongTy = 0x03 - MsgGetPeersTy = 0x04 - MsgPeersTy = 0x05 - - MsgStatusTy = 0x10 - //MsgGetTxsTy = 0x11 - MsgTxTy = 0x12 - MsgGetBlockHashesTy = 0x13 - MsgBlockHashesTy = 0x14 - MsgGetBlocksTy = 0x15 - MsgBlockTy = 0x16 - MsgNewBlockTy = 0x17 -) - -var msgTypeToString = map[MsgType]string{ - MsgHandshakeTy: "Handshake", - MsgDiscTy: "Disconnect", - MsgPingTy: "Ping", - MsgPongTy: "Pong", - MsgGetPeersTy: "Get peers", - MsgStatusTy: "Status", - MsgPeersTy: "Peers", - MsgTxTy: "Transactions", - MsgBlockTy: "Blocks", - //MsgGetTxsTy: "Get Txs", - MsgGetBlockHashesTy: "Get block hashes", - MsgBlockHashesTy: "Block hashes", - MsgGetBlocksTy: "Get blocks", -} - -func (mt MsgType) String() string { - return msgTypeToString[mt] -} - -type Msg struct { - Type MsgType // Specifies how the encoded data should be interpreted - //Data []byte - Data *ethutil.Value -} - -func NewMessage(msgType MsgType, data interface{}) *Msg { - return &Msg{ - Type: msgType, - Data: ethutil.NewValue(data), - } -} - -type Messages []*Msg - -// The basic message reader waits for data on the given connection, decoding -// and doing a few sanity checks such as if there's a data type and -// unmarhals the given data -func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { - // The recovering function in case anything goes horribly wrong - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("ethwire.ReadMessage error: %v", r) - } - }() - - var ( - buff []byte - messages [][]byte - msgLength int - ) - - for { - // Give buffering some time - conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) - // Create a new temporarily buffer - b := make([]byte, 1440) - n, _ := conn.Read(b) - if err != nil && n == 0 { - if err.Error() != "EOF" { - fmt.Println("err now", err) - return nil, err - } else { - break - } - } - - if n == 0 && len(buff) == 0 { - // If there's nothing on the wire wait for a bit - time.Sleep(200 * time.Millisecond) - - continue - } - - buff = append(buff, b[:n]...) - if msgLength == 0 { - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, buff[:4]) != 0 { - return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4]) - } - - // Read the length of the message - msgLength = int(ethutil.BytesToNumber(buff[4:8])) - - // Remove the token and length - buff = buff[8:] - } - - if len(buff) >= msgLength { - messages = append(messages, buff[:msgLength]) - buff = buff[msgLength:] - msgLength = 0 - - if len(buff) == 0 { - break - } - } - } - - for _, m := range messages { - decoder := ethutil.NewValueFromBytes(m) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msgs = append(msgs, &Msg{Type: MsgType(t), Data: d}) - } - - return -} - -// The basic message writer takes care of writing data over the given -// connection and does some basic error checking -func WriteMessage(conn net.Conn, msg *Msg) error { - var pack []byte - - // Encode the type and the (RLP encoded) data for sending over the wire - encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode() - payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) - - // Write magic token and payload length (first 8 bytes) - pack = append(MagicToken, payloadLength...) - pack = append(pack, encoded...) - //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded) - - // Write to the connection - _, err := conn.Write(pack) - if err != nil { - return err - } - - return nil -} diff --git a/peer.go b/peer.go index 36db68023..50daceb83 100644 --- a/peer.go +++ b/peer.go @@ -14,8 +14,8 @@ import ( "github.com/ethereum/go-ethereum/chain" "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/ethwire" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/wire" ) var peerlogger = logger.NewLogger("PEER") @@ -112,7 +112,7 @@ type Peer struct { // Net connection conn net.Conn // Output queue which is used to communicate and handle messages - outputQueue chan *ethwire.Msg + outputQueue chan *wire.Msg // Quit channel quit chan bool // Determines whether it's an inbound or outbound peer @@ -164,7 +164,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { pubkey := ethereum.KeyManager().PublicKey()[1:] return &Peer{ - outputQueue: make(chan *ethwire.Msg, outputBufferSize), + outputQueue: make(chan *wire.Msg, outputBufferSize), quit: make(chan bool), ethereum: ethereum, conn: conn, @@ -184,7 +184,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { p := &Peer{ - outputQueue: make(chan *ethwire.Msg, outputBufferSize), + outputQueue: make(chan *wire.Msg, outputBufferSize), quit: make(chan bool), ethereum: ethereum, inbound: false, @@ -266,14 +266,14 @@ func (p *Peer) SetVersion(version string) { } // Outputs any RLP encoded data to the peer -func (p *Peer) QueueMessage(msg *ethwire.Msg) { +func (p *Peer) QueueMessage(msg *wire.Msg) { if atomic.LoadInt32(&p.connected) != 1 { return } p.outputQueue <- msg } -func (p *Peer) writeMessage(msg *ethwire.Msg) { +func (p *Peer) writeMessage(msg *wire.Msg) { // Ignore the write if we're not connected if atomic.LoadInt32(&p.connected) != 1 { return @@ -281,7 +281,7 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { if !p.versionKnown { switch msg.Type { - case ethwire.MsgHandshakeTy: // Ok + case wire.MsgHandshakeTy: // Ok default: // Anything but ack is allowed return } @@ -289,7 +289,7 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { /* if !p.statusKnown { switch msg.Type { - case ethwire.MsgStatusTy: // Ok + case wire.MsgStatusTy: // Ok default: // Anything but ack is allowed return } @@ -299,7 +299,7 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg)) - err := ethwire.WriteMessage(p.conn, msg) + err := wire.WriteMessage(p.conn, msg) if err != nil { peerlogger.Debugln(" Can't send message:", err) // Stop the client if there was an error writing to it @@ -322,7 +322,7 @@ out: case msg := <-p.outputQueue: if !p.statusKnown { switch msg.Type { - case ethwire.MsgTxTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockTy: + case wire.MsgTxTy, wire.MsgGetBlockHashesTy, wire.MsgBlockHashesTy, wire.MsgGetBlocksTy, wire.MsgBlockTy: break skip } } @@ -340,13 +340,13 @@ out: return } */ - p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) + p.writeMessage(wire.NewMessage(wire.MsgPingTy, "")) p.pingStartTime = time.Now() // Service timer takes care of peer broadcasting, transaction // posting or block posting case <-serviceTimer.C: - p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) + p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, "")) case <-p.quit: // Break out of the for loop if a quit message is posted @@ -366,7 +366,7 @@ clean: } } -func formatMessage(msg *ethwire.Msg) (ret string) { +func formatMessage(msg *wire.Msg) (ret string) { ret = fmt.Sprintf("%v %v", msg.Type, msg.Data) /* @@ -375,12 +375,12 @@ func formatMessage(msg *ethwire.Msg) (ret string) { */ /* switch msg.Type { - case ethwire.MsgPeersTy: + case wire.MsgPeersTy: ret += fmt.Sprintf("(%d entries)", msg.Data.Len()) - case ethwire.MsgBlockTy: + case wire.MsgBlockTy: b1, b2 := chain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1)) ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4]) - case ethwire.MsgBlockHashesTy: + case wire.MsgBlockHashesTy: h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes() ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2) } @@ -396,7 +396,7 @@ func (p *Peer) HandleInbound() { // HMM? time.Sleep(50 * time.Millisecond) // Wait for a message from the peer - msgs, err := ethwire.ReadMessages(p.conn) + msgs, err := wire.ReadMessages(p.conn) if err != nil { peerlogger.Debugln(err) } @@ -404,27 +404,27 @@ func (p *Peer) HandleInbound() { peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg)) switch msg.Type { - case ethwire.MsgHandshakeTy: + case wire.MsgHandshakeTy: // Version message p.handleHandshake(msg) //if p.caps.IsCap(CapPeerDiscTy) { - p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) + p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, "")) //} - case ethwire.MsgDiscTy: + case wire.MsgDiscTy: p.Stop() peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint())) - case ethwire.MsgPingTy: + case wire.MsgPingTy: // Respond back with pong - p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, "")) - case ethwire.MsgPongTy: + p.QueueMessage(wire.NewMessage(wire.MsgPongTy, "")) + case wire.MsgPongTy: // If we received a pong back from a peer we set the // last pong so the peer handler knows this peer is still // active. p.lastPong = time.Now().Unix() p.pingTime = time.Since(p.pingStartTime) - case ethwire.MsgTxTy: + case wire.MsgTxTy: // If the message was a transaction queue the transaction // in the TxPool where it will undergo validation and // processing when a new block is found @@ -432,10 +432,10 @@ func (p *Peer) HandleInbound() { tx := chain.NewTransactionFromValue(msg.Data.Get(i)) p.ethereum.TxPool().QueueTransaction(tx) } - case ethwire.MsgGetPeersTy: + case wire.MsgGetPeersTy: // Peer asked for list of connected peers //p.pushPeers() - case ethwire.MsgPeersTy: + case wire.MsgPeersTy: // Received a list of peers (probably because MsgGetPeersTy was send) data := msg.Data // Create new list of possible peers for the ethereum to process @@ -449,7 +449,7 @@ func (p *Peer) HandleInbound() { // Connect to the list of peers p.ethereum.ProcessPeerList(peers) - case ethwire.MsgStatusTy: + case wire.MsgStatusTy: // Handle peer's status msg p.handleStatus(msg) } @@ -458,7 +458,7 @@ func (p *Peer) HandleInbound() { if p.statusKnown { switch msg.Type { /* - case ethwire.MsgGetTxsTy: + case wire.MsgGetTxsTy: // Get the current transactions of the pool txs := p.ethereum.TxPool().CurrentTransactions() // Get the RlpData values from the txs @@ -467,10 +467,10 @@ func (p *Peer) HandleInbound() { txsInterface[i] = tx.RlpData() } // Broadcast it back to the peer - p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) + p.QueueMessage(wire.NewMessage(wire.MsgTxTy, txsInterface)) */ - case ethwire.MsgGetBlockHashesTy: + case wire.MsgGetBlockHashesTy: if msg.Data.Len() < 2 { peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) } @@ -480,9 +480,9 @@ func (p *Peer) HandleInbound() { hashes := p.ethereum.ChainManager().GetChainHashesFromHash(hash, amount) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) + p.QueueMessage(wire.NewMessage(wire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) - case ethwire.MsgGetBlocksTy: + case wire.MsgGetBlocksTy: // Limit to max 300 blocks max := int(math.Min(float64(msg.Data.Len()), 300.0)) var blocks []interface{} @@ -495,9 +495,9 @@ func (p *Peer) HandleInbound() { } } - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks)) + p.QueueMessage(wire.NewMessage(wire.MsgBlockTy, blocks)) - case ethwire.MsgBlockHashesTy: + case wire.MsgBlockHashesTy: p.catchingUp = true blockPool := p.ethereum.blockPool @@ -528,7 +528,7 @@ func (p *Peer) HandleInbound() { p.doneFetchingHashes = true } - case ethwire.MsgBlockTy: + case wire.MsgBlockTy: p.catchingUp = true blockPool := p.ethereum.blockPool @@ -540,7 +540,7 @@ func (p *Peer) HandleInbound() { p.lastBlockReceived = time.Now() } - case ethwire.MsgNewBlockTy: + case wire.MsgNewBlockTy: var ( blockPool = p.ethereum.blockPool block = chain.NewBlockFromRlpValue(msg.Data.Get(0)) @@ -563,7 +563,7 @@ func (self *Peer) FetchBlocks(hashes [][]byte) { if len(hashes) > 0 { peerlogger.Debugf("Fetching blocks (%d)\n", len(hashes)) - self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes))) + self.QueueMessage(wire.NewMessage(wire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes))) } } @@ -629,7 +629,7 @@ func (p *Peer) Start() { // Wait a few seconds for startup and then ask for an initial ping time.Sleep(2 * time.Second) - p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) + p.writeMessage(wire.NewMessage(wire.MsgPingTy, "")) p.pingStartTime = time.Now() } @@ -648,12 +648,12 @@ func (p *Peer) StopWithReason(reason DiscReason) { close(p.quit) if atomic.LoadInt32(&p.connected) != 0 { - p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, reason)) + p.writeMessage(wire.NewMessage(wire.MsgDiscTy, reason)) p.conn.Close() } } -func (p *Peer) peersMessage() *ethwire.Msg { +func (p *Peer) peersMessage() *wire.Msg { outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) // Serialise each peer for i, peer := range p.ethereum.InOutPeers() { @@ -664,7 +664,7 @@ func (p *Peer) peersMessage() *ethwire.Msg { } // Return the message to the peer with the known list of connected clients - return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers) + return wire.NewMessage(wire.MsgPeersTy, outPeers) } // Pushes the list of outbound peers to the client when requested @@ -673,7 +673,7 @@ func (p *Peer) pushPeers() { } func (self *Peer) pushStatus() { - msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{ + msg := wire.NewMessage(wire.MsgStatusTy, []interface{}{ uint32(ProtocolVersion), uint32(NetVersion), self.ethereum.ChainManager().TD, @@ -684,7 +684,7 @@ func (self *Peer) pushStatus() { self.QueueMessage(msg) } -func (self *Peer) handleStatus(msg *ethwire.Msg) { +func (self *Peer) handleStatus(msg *wire.Msg) { c := msg.Data var ( @@ -729,7 +729,7 @@ func (self *Peer) handleStatus(msg *ethwire.Msg) { func (p *Peer) pushHandshake() error { pubkey := p.ethereum.KeyManager().PublicKey() - msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ + msg := wire.NewMessage(wire.MsgHandshakeTy, []interface{}{ P2PVersion, []byte(p.version), []interface{}{[]interface{}{"eth", ProtocolVersion}}, p.port, pubkey[1:], }) @@ -738,7 +738,7 @@ func (p *Peer) pushHandshake() error { return nil } -func (p *Peer) handleHandshake(msg *ethwire.Msg) { +func (p *Peer) handleHandshake(msg *wire.Msg) { c := msg.Data var ( diff --git a/wire/.gitignore b/wire/.gitignore new file mode 100644 index 000000000..f725d58d1 --- /dev/null +++ b/wire/.gitignore @@ -0,0 +1,12 @@ +# See http://help.github.com/ignore-files/ for more about ignoring files. +# +# If you find yourself ignoring temporary files generated by your text editor +# or operating system, you probably want to add a global ignore instead: +# git config --global core.excludesfile ~/.gitignore_global + +/tmp +*/**/*un~ +*un~ +.DS_Store +*/**/.DS_Store + diff --git a/wire/README.md b/wire/README.md new file mode 100644 index 000000000..7f63688b3 --- /dev/null +++ b/wire/README.md @@ -0,0 +1,36 @@ +# ethwire + +The ethwire package contains the ethereum wire protocol. The ethwire +package is required to write and read from the ethereum network. + +# Installation + +`go get github.com/ethereum/ethwire-go` + +# Messaging overview + +The Ethereum Wire protocol defines the communication between the nodes +running Ethereum. Further reader reading can be done on the +[Wiki](http://wiki.ethereum.org/index.php/Wire_Protocol). + +# Reading Messages + +```go +// Read and validate the next eth message from the provided connection. +// returns a error message with the details. +msg, err := ethwire.ReadMessage(conn) +if err != nil { + // Handle error +} +``` + +# Writing Messages + +```go +// Constructs a message which can be interpreted by the eth network. +// Write the inventory to network +err := ethwire.WriteMessage(conn, &Msg{ + Type: ethwire.MsgInvTy, + Data : []interface{}{...}, +}) +``` diff --git a/wire/client_identity.go b/wire/client_identity.go new file mode 100644 index 000000000..0a268024a --- /dev/null +++ b/wire/client_identity.go @@ -0,0 +1,56 @@ +package wire + +import ( + "fmt" + "runtime" +) + +// should be used in Peer handleHandshake, incorporate Caps, ProtocolVersion, Pubkey etc. +type ClientIdentity interface { + String() string +} + +type SimpleClientIdentity struct { + clientIdentifier string + version string + customIdentifier string + os string + implementation string +} + +func NewSimpleClientIdentity(clientIdentifier string, version string, customIdentifier string) *SimpleClientIdentity { + clientIdentity := &SimpleClientIdentity{ + clientIdentifier: clientIdentifier, + version: version, + customIdentifier: customIdentifier, + os: runtime.GOOS, + implementation: runtime.Version(), + } + + return clientIdentity +} + +func (c *SimpleClientIdentity) init() { +} + +func (c *SimpleClientIdentity) String() string { + var id string + if len(c.customIdentifier) > 0 { + id = "/" + c.customIdentifier + } + + return fmt.Sprintf("%s/v%s%s/%s/%s", + c.clientIdentifier, + c.version, + id, + c.os, + c.implementation) +} + +func (c *SimpleClientIdentity) SetCustomIdentifier(customIdentifier string) { + c.customIdentifier = customIdentifier +} + +func (c *SimpleClientIdentity) GetCustomIdentifier() string { + return c.customIdentifier +} diff --git a/wire/client_identity_test.go b/wire/client_identity_test.go new file mode 100644 index 000000000..c0e7a0159 --- /dev/null +++ b/wire/client_identity_test.go @@ -0,0 +1,30 @@ +package wire + +import ( + "fmt" + "runtime" + "testing" +) + +func TestClientIdentity(t *testing.T) { + clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test") + clientString := clientIdentity.String() + expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version()) + if clientString != expected { + t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) + } + customIdentifier := clientIdentity.GetCustomIdentifier() + if customIdentifier != "test" { + t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test', got %q", customIdentifier) + } + clientIdentity.SetCustomIdentifier("test2") + customIdentifier = clientIdentity.GetCustomIdentifier() + if customIdentifier != "test2" { + t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test2', got %q", customIdentifier) + } + clientString = clientIdentity.String() + expected = fmt.Sprintf("Ethereum(G)/v0.5.16/test2/%s/%s", runtime.GOOS, runtime.Version()) + if clientString != expected { + t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) + } +} diff --git a/wire/messages2.go b/wire/messages2.go new file mode 100644 index 000000000..acbd9e0d5 --- /dev/null +++ b/wire/messages2.go @@ -0,0 +1,199 @@ +package wire + +import ( + "bytes" + "errors" + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/ethutil" +) + +// The connection object allows you to set up a connection to the Ethereum network. +// The Connection object takes care of all encoding and sending objects properly over +// the network. +type Connection struct { + conn net.Conn + nTimeout time.Duration + pendingMessages Messages +} + +// Create a new connection to the Ethereum network +func New(conn net.Conn) *Connection { + return &Connection{conn: conn, nTimeout: 500} +} + +// Read, reads from the network. It will block until the next message is received. +func (self *Connection) Read() *Msg { + if len(self.pendingMessages) == 0 { + self.readMessages() + } + + ret := self.pendingMessages[0] + self.pendingMessages = self.pendingMessages[1:] + + return ret + +} + +// Write to the Ethereum network specifying the type of the message and +// the data. Data can be of type RlpEncodable or []interface{}. Returns +// nil or if something went wrong an error. +func (self *Connection) Write(typ MsgType, v ...interface{}) error { + var pack []byte + + slice := [][]interface{}{[]interface{}{byte(typ)}} + for _, value := range v { + if encodable, ok := value.(ethutil.RlpEncodeDecode); ok { + slice = append(slice, encodable.RlpValue()) + } else if raw, ok := value.([]interface{}); ok { + slice = append(slice, raw) + } else { + panic(fmt.Sprintf("Unable to 'write' object of type %T", value)) + } + } + + // Encode the type and the (RLP encoded) data for sending over the wire + encoded := ethutil.NewValue(slice).Encode() + payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) + + // Write magic token and payload length (first 8 bytes) + pack = append(MagicToken, payloadLength...) + pack = append(pack, encoded...) + + // Write to the connection + _, err := self.conn.Write(pack) + if err != nil { + return err + } + + return nil +} + +func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + if len(data) == 0 { + return nil, nil, true, nil + } + + if len(data) <= 8 { + return nil, remaining, false, errors.New("Invalid message") + } + + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, data[:4]) != 0 { + return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) + } + + messageLength := ethutil.BytesToNumber(data[4:8]) + remaining = data[8+messageLength:] + if int(messageLength) > len(data[8:]) { + return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) + } + + message := data[8 : 8+messageLength] + decoder := ethutil.NewValueFromBytes(message) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msg = &Msg{ + Type: MsgType(t), + Data: d, + } + + return +} + +// The basic message reader waits for data on the given connection, decoding +// and doing a few sanity checks such as if there's a data type and +// unmarhals the given data +func (self *Connection) readMessages() (err error) { + // The recovering function in case anything goes horribly wrong + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("wire.ReadMessage error: %v", r) + } + }() + + // Buff for writing network message to + //buff := make([]byte, 1440) + var buff []byte + var totalBytes int + for { + // Give buffering some time + self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond)) + // Create a new temporarily buffer + b := make([]byte, 1440) + // Wait for a message from this peer + n, _ := self.conn.Read(b) + if err != nil && n == 0 { + if err.Error() != "EOF" { + fmt.Println("err now", err) + return err + } else { + break + } + + // Messages can't be empty + } else if n == 0 { + break + } + + buff = append(buff, b[:n]...) + totalBytes += n + } + + // Reslice buffer + buff = buff[:totalBytes] + msg, remaining, done, err := self.readMessage(buff) + for ; done != true; msg, remaining, done, err = self.readMessage(remaining) { + //log.Println("rx", msg) + + if msg != nil { + self.pendingMessages = append(self.pendingMessages, msg) + } + } + + return +} + +func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { + if len(data) == 0 { + return nil, nil, true, nil + } + + if len(data) <= 8 { + return nil, remaining, false, errors.New("Invalid message") + } + + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, data[:4]) != 0 { + return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) + } + + messageLength := ethutil.BytesToNumber(data[4:8]) + remaining = data[8+messageLength:] + if int(messageLength) > len(data[8:]) { + return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) + } + + message := data[8 : 8+messageLength] + decoder := ethutil.NewValueFromBytes(message) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msg = &Msg{ + Type: MsgType(t), + Data: d, + } + + return +} + +func bufferedRead(conn net.Conn) ([]byte, error) { + return nil, nil +} diff --git a/wire/messaging.go b/wire/messaging.go new file mode 100644 index 000000000..b919aa0f4 --- /dev/null +++ b/wire/messaging.go @@ -0,0 +1,179 @@ +// Package wire provides low level access to the Ethereum network and allows +// you to broadcast data over the network. +package wire + +import ( + "bytes" + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/ethutil" +) + +// Connection interface describing the methods required to implement the wire protocol. +type Conn interface { + Write(typ MsgType, v ...interface{}) error + Read() *Msg +} + +// The magic token which should be the first 4 bytes of every message and can be used as separator between messages. +var MagicToken = []byte{34, 64, 8, 145} + +type MsgType byte + +const ( + // Values are given explicitly instead of by iota because these values are + // defined by the wire protocol spec; it is easier for humans to ensure + // correctness when values are explicit. + MsgHandshakeTy = 0x00 + MsgDiscTy = 0x01 + MsgPingTy = 0x02 + MsgPongTy = 0x03 + MsgGetPeersTy = 0x04 + MsgPeersTy = 0x05 + + MsgStatusTy = 0x10 + //MsgGetTxsTy = 0x11 + MsgTxTy = 0x12 + MsgGetBlockHashesTy = 0x13 + MsgBlockHashesTy = 0x14 + MsgGetBlocksTy = 0x15 + MsgBlockTy = 0x16 + MsgNewBlockTy = 0x17 +) + +var msgTypeToString = map[MsgType]string{ + MsgHandshakeTy: "Handshake", + MsgDiscTy: "Disconnect", + MsgPingTy: "Ping", + MsgPongTy: "Pong", + MsgGetPeersTy: "Get peers", + MsgStatusTy: "Status", + MsgPeersTy: "Peers", + MsgTxTy: "Transactions", + MsgBlockTy: "Blocks", + //MsgGetTxsTy: "Get Txs", + MsgGetBlockHashesTy: "Get block hashes", + MsgBlockHashesTy: "Block hashes", + MsgGetBlocksTy: "Get blocks", +} + +func (mt MsgType) String() string { + return msgTypeToString[mt] +} + +type Msg struct { + Type MsgType // Specifies how the encoded data should be interpreted + //Data []byte + Data *ethutil.Value +} + +func NewMessage(msgType MsgType, data interface{}) *Msg { + return &Msg{ + Type: msgType, + Data: ethutil.NewValue(data), + } +} + +type Messages []*Msg + +// The basic message reader waits for data on the given connection, decoding +// and doing a few sanity checks such as if there's a data type and +// unmarhals the given data +func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { + // The recovering function in case anything goes horribly wrong + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("wire.ReadMessage error: %v", r) + } + }() + + var ( + buff []byte + messages [][]byte + msgLength int + ) + + for { + // Give buffering some time + conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) + // Create a new temporarily buffer + b := make([]byte, 1440) + n, _ := conn.Read(b) + if err != nil && n == 0 { + if err.Error() != "EOF" { + fmt.Println("err now", err) + return nil, err + } else { + break + } + } + + if n == 0 && len(buff) == 0 { + // If there's nothing on the wire wait for a bit + time.Sleep(200 * time.Millisecond) + + continue + } + + buff = append(buff, b[:n]...) + if msgLength == 0 { + // Check if the received 4 first bytes are the magic token + if bytes.Compare(MagicToken, buff[:4]) != 0 { + return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4]) + } + + // Read the length of the message + msgLength = int(ethutil.BytesToNumber(buff[4:8])) + + // Remove the token and length + buff = buff[8:] + } + + if len(buff) >= msgLength { + messages = append(messages, buff[:msgLength]) + buff = buff[msgLength:] + msgLength = 0 + + if len(buff) == 0 { + break + } + } + } + + for _, m := range messages { + decoder := ethutil.NewValueFromBytes(m) + // Type of message + t := decoder.Get(0).Uint() + // Actual data + d := decoder.SliceFrom(1) + + msgs = append(msgs, &Msg{Type: MsgType(t), Data: d}) + } + + return +} + +// The basic message writer takes care of writing data over the given +// connection and does some basic error checking +func WriteMessage(conn net.Conn, msg *Msg) error { + var pack []byte + + // Encode the type and the (RLP encoded) data for sending over the wire + encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode() + payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) + + // Write magic token and payload length (first 8 bytes) + pack = append(MagicToken, payloadLength...) + pack = append(pack, encoded...) + //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded) + + // Write to the connection + _, err := conn.Write(pack) + if err != nil { + return err + } + + return nil +} -- cgit