aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaran <maran.hidskes@gmail.com>2014-05-20 17:50:34 +0800
committerMaran <maran.hidskes@gmail.com>2014-05-20 17:50:34 +0800
commit12f30e6220354c4a8b08ecf41bb53444143f3660 (patch)
tree4397ab1b50723a2c6ad9ce6980f49bb137c48f1f
parentb8034f4d9ed7eea29a219a2d894ae22041a906a7 (diff)
downloaddexon-12f30e6220354c4a8b08ecf41bb53444143f3660.tar.gz
dexon-12f30e6220354c4a8b08ecf41bb53444143f3660.tar.zst
dexon-12f30e6220354c4a8b08ecf41bb53444143f3660.zip
Refactored a lot of the chain catchup/reorg.
-rw-r--r--ethchain/block_chain.go2
-rw-r--r--ethereum.go4
-rw-r--r--peer.go125
3 files changed, 76 insertions, 55 deletions
diff --git a/ethchain/block_chain.go b/ethchain/block_chain.go
index 6c3b15a6a..2ce0f90a6 100644
--- a/ethchain/block_chain.go
+++ b/ethchain/block_chain.go
@@ -127,7 +127,6 @@ func (bc *BlockChain) FindCanonicalChain(blocks []*Block, commonBlockHash []byte
log.Println("[CHAIN] We have found the common parent block, breaking")
break
}
- log.Println("Checking incoming blocks:")
chainDifficulty.Add(chainDifficulty, bc.CalculateBlockTD(block))
}
@@ -182,6 +181,7 @@ func (bc *BlockChain) ResetTillBlockHash(hash []byte) error {
// XXX Why are we resetting? This is the block chain, it has nothing to do with states
//bc.Ethereum.StateManager().PrepareDefault(returnTo)
+ // Manually reset the last sync block
err := ethutil.Config.Db.Delete(lastBlock.Hash())
if err != nil {
return err
diff --git a/ethereum.go b/ethereum.go
index 14418023e..3a7202d53 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -222,7 +222,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
if phost == chost {
alreadyConnected = true
- ethutil.Config.Log.Debugf("[SERV] Peer %s already added.\n", chost)
+ //ethutil.Config.Log.Debugf("[SERV] Peer %s already added.\n", chost)
return
}
})
@@ -235,7 +235,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
s.peers.PushBack(peer)
- ethutil.Config.Log.Infof("[SERV] Adding peer %d / %d\n", s.peers.Len(), s.MaxPeers)
+ ethutil.Config.Log.Infof("[SERV] Adding peer (%s) %d / %d\n", addr, s.peers.Len(), s.MaxPeers)
}
return nil
diff --git a/peer.go b/peer.go
index 45ff0a795..879361b2b 100644
--- a/peer.go
+++ b/peer.go
@@ -127,6 +127,7 @@ type Peer struct {
// Indicated whether the node is catching up or not
catchingUp bool
+ diverted bool
blocksRequested int
Version string
@@ -190,7 +191,6 @@ func (p *Peer) QueueMessage(msg *ethwire.Msg) {
if atomic.LoadInt32(&p.connected) != 1 {
return
}
-
p.outputQueue <- msg
}
@@ -268,7 +268,6 @@ func (p *Peer) HandleInbound() {
for atomic.LoadInt32(&p.disconnect) == 0 {
// HMM?
time.Sleep(500 * time.Millisecond)
-
// Wait for a message from the peer
msgs, err := ethwire.ReadMessages(p.conn)
if err != nil {
@@ -300,32 +299,36 @@ func (p *Peer) HandleInbound() {
var err error
// Make sure we are actually receiving anything
- if msg.Data.Len()-1 > 1 && p.catchingUp {
+ if msg.Data.Len()-1 > 1 && p.diverted {
// We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find
// common ground to start syncing from
lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1))
- if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) {
- // If we can't find a common ancenstor we need to request more blocks.
- // FIXME: At one point this won't scale anymore since we are not asking for an offset
- // we just keep increasing the amount of blocks.
- //fmt.Println("[PEER] No common ancestor found, requesting more blocks.")
- p.blocksRequested = p.blocksRequested * 2
- p.catchingUp = false
- p.SyncWithBlocks()
- }
-
+ ethutil.Config.Log.Infof("[PEER] Last block: %x. Checking if we have it locally.\n", lastBlock.Hash())
for i := msg.Data.Len() - 1; i >= 0; i-- {
block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
// Do we have this block on our chain? If so we can continue
if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) {
// We don't have this block, but we do have a block with the same prevHash, diversion time!
if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) {
- if p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) {
- return
+ p.diverted = false
+ if !p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) {
+ p.SyncWithPeerToLastKnown()
}
+ break
}
}
}
+ if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) {
+ // If we can't find a common ancenstor we need to request more blocks.
+ // FIXME: At one point this won't scale anymore since we are not asking for an offset
+ // we just keep increasing the amount of blocks.
+ p.blocksRequested = p.blocksRequested * 2
+
+ ethutil.Config.Log.Infof("[PEER] No common ancestor found, requesting %d more blocks.\n", p.blocksRequested)
+ p.catchingUp = false
+ p.FindCommonParentBlock()
+ break
+ }
}
for i := msg.Data.Len() - 1; i >= 0; i-- {
@@ -346,23 +349,28 @@ func (p *Peer) HandleInbound() {
}
}
+ if msg.Data.Len() == 0 {
+ // Set catching up to false if
+ // the peer has nothing left to give
+ p.catchingUp = false
+ }
+
if err != nil {
// If the parent is unknown try to catch up with this peer
if ethchain.IsParentErr(err) {
- ethutil.Config.Log.Infoln("Attempting to catch up")
+ ethutil.Config.Log.Infoln("Attempting to catch up since we don't know the parent")
p.catchingUp = false
p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
} else if ethchain.IsValidationErr(err) {
- fmt.Println(err)
+ fmt.Println("Err:", err)
p.catchingUp = false
}
} else {
- // XXX Do we want to catch up if there were errors?
// If we're catching up, try to catch up further.
if p.catchingUp && msg.Data.Len() > 1 {
- if ethutil.Config.Debug && lastBlock != nil {
+ if lastBlock != nil {
blockInfo := lastBlock.BlockInfo()
- ethutil.Config.Log.Infof("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
+ ethutil.Config.Log.Debugf("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
}
p.catchingUp = false
@@ -372,11 +380,6 @@ func (p *Peer) HandleInbound() {
}
}
- if msg.Data.Len() == 0 {
- // Set catching up to false if
- // the peer has nothing left to give
- p.catchingUp = false
- }
case ethwire.MsgTxTy:
// If the message was a transaction queue the transaction
// in the TxPool where it will undergo validation and
@@ -444,7 +447,7 @@ func (p *Peer) HandleInbound() {
}
} else {
- ethutil.Config.Log.Debugf("[PEER] Could not find a similar block")
+ //ethutil.Config.Log.Debugf("[PEER] Could not find a similar block")
// If no blocks are found we send back a reply with msg not in chain
// and the last hash from get chain
lastHash := msg.Data.Get(l - 1)
@@ -452,8 +455,14 @@ func (p *Peer) HandleInbound() {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()}))
}
case ethwire.MsgNotInChainTy:
- ethutil.Config.Log.Debugf("Not in chain %x\n", msg.Data)
- // TODO
+ ethutil.Config.Log.Debugf("Not in chain: %x\n", msg.Data.Get(0).Bytes())
+ if p.diverted == true {
+ // If were already looking for a common parent and we get here again we need to go deeper
+ p.blocksRequested = p.blocksRequested * 2
+ }
+ p.diverted = true
+ p.catchingUp = false
+ p.FindCommonParentBlock()
case ethwire.MsgGetTxsTy:
// Get the current transactions of the pool
txs := p.ethereum.TxPool().CurrentTransactions()
@@ -471,7 +480,6 @@ func (p *Peer) HandleInbound() {
}
}
}
-
p.Stop()
}
@@ -581,14 +589,18 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
}
- // Catch up with the connected peer
- p.SyncWithBlocks()
-
// Set the peer's caps
p.caps = Caps(c.Get(3).Byte())
// Get a reference to the peers version
p.Version = c.Get(2).Str()
+ // Catch up with the connected peer
+ if !p.ethereum.IsUpToDate() {
+ ethutil.Config.Log.Debugln("Already syncing up with a peer; sleeping")
+ time.Sleep(10 * time.Second)
+ }
+ p.SyncWithPeerToLastKnown()
+
ethutil.Config.Log.Debugln("[PEER]", p)
}
@@ -609,38 +621,47 @@ func (p *Peer) String() string {
return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.Version, p.caps)
}
-func (p *Peer) SyncWithBlocks() {
- if !p.catchingUp {
- p.catchingUp = true
- // FIXME: THIS SHOULD NOT BE NEEDED
- if p.blocksRequested == 0 {
- p.blocksRequested = 10
- }
- blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
+func (p *Peer) SyncWithPeerToLastKnown() {
+ p.catchingUp = false
+ p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
+}
- var hashes []interface{}
- for _, block := range blocks {
- hashes = append(hashes, block.Hash())
- }
+func (p *Peer) FindCommonParentBlock() {
+ if p.catchingUp {
+ return
+ }
- msgInfo := append(hashes, uint64(50))
+ p.catchingUp = true
+ if p.blocksRequested == 0 {
+ p.blocksRequested = 20
+ }
+ blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
- msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
- p.QueueMessage(msg)
+ var hashes []interface{}
+ for _, block := range blocks {
+ hashes = append(hashes, block.Hash())
}
-}
+ msgInfo := append(hashes, uint64(len(hashes)))
+
+ ethutil.Config.Log.Infof("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String())
+
+ msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
+ p.QueueMessage(msg)
+}
func (p *Peer) CatchupWithPeer(blockHash []byte) {
if !p.catchingUp {
+ // Make sure nobody else is catching up when you want to do this
p.catchingUp = true
msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(50)})
p.QueueMessage(msg)
- ethutil.Config.Log.Debugf("Requesting blockchain %x...\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4])
+ ethutil.Config.Log.Debugf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr())
- msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
- p.QueueMessage(msg)
- ethutil.Config.Log.Debugln("Requested transactions")
+ /*
+ msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
+ p.QueueMessage(msg)
+ */
}
}