diff options
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 324 |
1 files changed, 144 insertions, 180 deletions
@@ -4,15 +4,18 @@ import ( "bytes" "container/list" "fmt" - "github.com/ethereum/eth-go/ethchain" - "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethutil" - "github.com/ethereum/eth-go/ethwire" + "math" + "math/big" "net" "strconv" "strings" "sync/atomic" "time" + + "github.com/ethereum/eth-go/ethchain" + "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" ) var peerlogger = ethlog.NewLogger("PEER") @@ -21,7 +24,7 @@ const ( // The size of the output buffer for writing messages outputBufferSize = 50 // Current protocol version - ProtocolVersion = 23 + ProtocolVersion = 27 // Interval for ping/pong message pingPongTimer = 2 * time.Second ) @@ -124,9 +127,13 @@ type Peer struct { lastPong int64 lastBlockReceived time.Time - host []byte - port uint16 - caps Caps + host []byte + port uint16 + caps Caps + td *big.Int + bestHash []byte + lastReceivedHash []byte + requestedHashes [][]byte // This peer's public key pubkey []byte @@ -197,10 +204,12 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { } func (self *Peer) Connect(addr string) (conn net.Conn, err error) { - for attempts := 0; attempts < 5; attempts++ { + const maxTries = 3 + for attempts := 0; attempts < maxTries; attempts++ { conn, err = net.DialTimeout("tcp", addr, 10*time.Second) if err != nil { - peerlogger.Debugf("Peer connection failed. Retrying (%d/5)\n", attempts+1) + //peerlogger.Debugf("Peer connection failed. Retrying (%d/%d) (%s)\n", attempts+1, maxTries, addr) + time.Sleep(time.Duration(attempts*20) * time.Second) continue } @@ -291,12 +300,14 @@ out: // Ping timer case <-pingTimer.C: - timeSince := time.Since(time.Unix(p.lastPong, 0)) - if !p.pingStartTime.IsZero() && p.lastPong != 0 && timeSince > (pingPongTimer+30*time.Second) { - peerlogger.Infof("Peer did not respond to latest pong fast enough, it took %s, disconnecting.\n", timeSince) - p.Stop() - return - } + /* + timeSince := time.Since(time.Unix(p.lastPong, 0)) + if !p.pingStartTime.IsZero() && p.lastPong != 0 && timeSince > (pingPongTimer+30*time.Second) { + peerlogger.Infof("Peer did not respond to latest pong fast enough, it took %s, disconnecting.\n", timeSince) + p.Stop() + return + } + */ p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) p.pingStartTime = time.Now() @@ -340,7 +351,6 @@ func (p *Peer) HandleInbound() { for _, msg := range msgs { peerlogger.DebugDetailf("(%v) => %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data) - nextMsg: switch msg.Type { case ethwire.MsgHandshakeTy: // Version message @@ -349,9 +359,10 @@ func (p *Peer) HandleInbound() { if p.caps.IsCap(CapPeerDiscTy) { p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) } + case ethwire.MsgDiscTy: p.Stop() - peerlogger.Infoln("Disconnect peer:", DiscReason(msg.Data.Get(0).Uint())) + peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint())) case ethwire.MsgPingTy: // Respond back with pong p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, "")) @@ -360,111 +371,7 @@ func (p *Peer) HandleInbound() { // last pong so the peer handler knows this peer is still // active. p.lastPong = time.Now().Unix() - p.pingTime = time.Now().Sub(p.pingStartTime) - case ethwire.MsgBlockTy: - // Get all blocks and process them - var block, lastBlock *ethchain.Block - var err error - - // Make sure we are actually receiving anything - if msg.Data.Len()-1 > 1 && p.diverted { - // We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find - // common ground to start syncing from - lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1)) - if p.lastRequestedBlock != nil && bytes.Compare(lastBlock.Hash(), p.lastRequestedBlock.Hash()) == 0 { - p.catchingUp = false - continue - } - p.lastRequestedBlock = lastBlock - peerlogger.Infof("Last block: %x. Checking if we have it locally.\n", lastBlock.Hash()) - for i := msg.Data.Len() - 1; i >= 0; i-- { - block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) - // Do we have this block on our chain? If so we can continue - if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) { - // We don't have this block, but we do have a block with the same prevHash, diversion time! - if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) { - p.diverted = false - if !p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) { - p.SyncWithPeerToLastKnown() - break nextMsg - } - break - } - } - } - if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) { - // If we can't find a common ancenstor we need to request more blocks. - // FIXME: At one point this won't scale anymore since we are not asking for an offset - // we just keep increasing the amount of blocks. - p.blocksRequested = p.blocksRequested * 2 - - peerlogger.Infof("No common ancestor found, requesting %d more blocks.\n", p.blocksRequested) - p.FindCommonParentBlock() - break nextMsg - } - - p.catchingUp = false - } - - for i := msg.Data.Len() - 1; i >= 0; i-- { - block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) - - err = p.ethereum.StateManager().Process(block, false) - if err != nil { - if ethutil.Config.Debug { - peerlogger.Infof("Block %x failed\n", block.Hash()) - peerlogger.Infof("%v\n", err) - peerlogger.Debugln(block) - } - break - } else { - lastBlock = block - } - - p.lastBlockReceived = time.Now() - } - - if msg.Data.Len() <= 1 { - // Set catching up to false if - // the peer has nothing left to give - p.catchingUp = false - } - - if err != nil { - // If the parent is unknown try to catch up with this peer - if ethchain.IsParentErr(err) { - /* - b := ethchain.NewBlockFromRlpValue(msg.Data.Get(0)) - - peerlogger.Infof("Attempting to catch (%x). Parent known\n", b.Hash()) - p.catchingUp = false - - p.CatchupWithPeer(b.Hash()) - - peerlogger.Infoln(b) - */ - peerlogger.Infoln("Attempting to catch. Parent known") - p.catchingUp = false - p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) - } else if ethchain.IsValidationErr(err) { - fmt.Println("Err:", err) - p.catchingUp = false - } - } else { - // If we're catching up, try to catch up further. - if p.catchingUp && msg.Data.Len() > 1 { - if lastBlock != nil { - blockInfo := lastBlock.BlockInfo() - peerlogger.DebugDetailf("Synced chain to #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash) - } - - p.catchingUp = false - - hash := p.ethereum.BlockChain().CurrentBlock.Hash() - p.CatchupWithPeer(hash) - } - } - + p.pingTime = time.Since(p.pingStartTime) case ethwire.MsgTxTy: // If the message was a transaction queue the transaction // in the TxPool where it will undergo validation and @@ -489,78 +396,124 @@ func (p *Peer) HandleInbound() { // Connect to the list of peers p.ethereum.ProcessPeerList(peers) - case ethwire.MsgGetChainTy: - var parent *ethchain.Block - // Length minus one since the very last element in the array is a count - l := msg.Data.Len() - 1 - // Ignore empty get chains - if l == 0 { - break + case ethwire.MsgGetTxsTy: + // Get the current transactions of the pool + txs := p.ethereum.TxPool().CurrentTransactions() + // Get the RlpData values from the txs + txsInterface := make([]interface{}, len(txs)) + for i, tx := range txs { + txsInterface[i] = tx.RlpData() + } + // Broadcast it back to the peer + p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) + + case ethwire.MsgGetBlockHashesTy: + if msg.Data.Len() < 2 { + peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) } - // Amount of parents in the canonical chain - //amountOfBlocks := msg.Data.Get(l).AsUint() - amountOfBlocks := uint64(100) + hash := msg.Data.Get(0).Bytes() + amount := msg.Data.Get(1).Uint() - // Check each SHA block hash from the message and determine whether - // the SHA is in the database - for i := 0; i < l; i++ { - if data := msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) { - parent = p.ethereum.BlockChain().GetBlock(data) - break + hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount) + + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) + + case ethwire.MsgGetBlocksTy: + // Limit to max 300 blocks + max := int(math.Min(float64(msg.Data.Len()), 300.0)) + var blocks []interface{} + + for i := 0; i < max; i++ { + hash := msg.Data.Get(i).Bytes() + block := p.ethereum.BlockChain().GetBlock(hash) + if block != nil { + blocks = append(blocks, block.Value().Raw()) } } - // If a parent is found send back a reply - if parent != nil { - peerlogger.DebugDetailf("Found canonical block, returning chain from: %x ", parent.Hash()) - chain := p.ethereum.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks) - if len(chain) > 0 { - //peerlogger.Debugf("Returning %d blocks: %x ", len(chain), parent.Hash()) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain)) - } else { - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, []interface{}{})) - } + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks)) - } else { - //peerlogger.Debugf("Could not find a similar block") - // If no blocks are found we send back a reply with msg not in chain - // and the last hash from get chain - if l > 0 { - lastHash := msg.Data.Get(l - 1) - //log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw()) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()})) + case ethwire.MsgBlockHashesTy: + p.catchingUp = true + + blockPool := p.ethereum.blockPool + + foundCommonHash := false + + it := msg.Data.NewIterator() + for it.Next() { + hash := it.Value().Bytes() + + if blockPool.HasCommonHash(hash) { + foundCommonHash = true + + break } + + blockPool.AddHash(hash) + + p.lastReceivedHash = hash + + p.lastBlockReceived = time.Now() } - case ethwire.MsgNotInChainTy: - peerlogger.DebugDetailf("Not in chain: %x\n", msg.Data.Get(0).Bytes()) - if p.diverted == true { - // If were already looking for a common parent and we get here again we need to go deeper - p.blocksRequested = p.blocksRequested * 2 + + if foundCommonHash { + p.FetchBlocks() + } else { + p.FetchHashes() } - p.diverted = true - p.catchingUp = false - p.FindCommonParentBlock() - case ethwire.MsgGetTxsTy: - // Get the current transactions of the pool - txs := p.ethereum.TxPool().CurrentTransactions() - // Get the RlpData values from the txs - txsInterface := make([]interface{}, len(txs)) - for i, tx := range txs { - txsInterface[i] = tx.RlpData() + + case ethwire.MsgBlockTy: + p.catchingUp = true + + blockPool := p.ethereum.blockPool + + it := msg.Data.NewIterator() + + for it.Next() { + block := ethchain.NewBlockFromRlpValue(it.Value()) + + blockPool.SetBlock(block) + + p.lastBlockReceived = time.Now() } - // Broadcast it back to the peer - p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) - // Unofficial but fun nonetheless - case ethwire.MsgTalkTy: - peerlogger.Infoln("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.Str()) + linked := blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { + p.ethereum.StateManager().Process(block, false) + }) + + if !linked { + p.FetchBlocks() + } } } } + p.Stop() } +func (self *Peer) FetchBlocks() { + blockPool := self.ethereum.blockPool + + hashes := blockPool.Take(100, self) + if len(hashes) > 0 { + self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes))) + } +} + +func (self *Peer) FetchHashes() { + blockPool := self.ethereum.blockPool + + if self.td.Cmp(blockPool.td) >= 0 { + peerlogger.Debugf("Requesting hashes from %x\n", self.lastReceivedHash) + + if !blockPool.HasLatestHash() { + self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(200)})) + } + } +} + // General update method func (self *Peer) update() { serviceTimer := time.NewTicker(5 * time.Second) @@ -631,6 +584,7 @@ func (p *Peer) pushHandshake() error { pubkey := p.ethereum.KeyManager().PublicKey() msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ uint32(ProtocolVersion), uint32(0), []byte(p.version), byte(p.caps), p.port, pubkey[1:], + p.ethereum.BlockChain().TD.Uint64(), p.ethereum.BlockChain().CurrentBlock.Hash(), }) p.QueueMessage(msg) @@ -716,10 +670,15 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { p.SetVersion(c.Get(2).Str()) } + // Get the td and last hash + p.td = c.Get(6).BigInt() + p.bestHash = c.Get(7).Bytes() + p.lastReceivedHash = p.bestHash + p.ethereum.PushPeer(p) p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) - ethlogger.Infof("Added peer (%s) %d / %d\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers) + ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash) /* // Catch up with the connected peer @@ -728,7 +687,12 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { time.Sleep(10 * time.Second) } */ - p.SyncWithPeerToLastKnown() + //p.SyncWithPeerToLastKnown() + + if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 { + p.ethereum.blockPool.AddHash(p.lastReceivedHash) + p.FetchHashes() + } peerlogger.Debugln(p) } @@ -782,7 +746,7 @@ func (p *Peer) CatchupWithPeer(blockHash []byte) { if !p.catchingUp { // Make sure nobody else is catching up when you want to do this p.catchingUp = true - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(30)}) + msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(100)}) p.QueueMessage(msg) peerlogger.DebugDetailf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr()) |