aboutsummaryrefslogtreecommitdiffstats
path: root/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'peer.go')
-rw-r--r--peer.go185
1 files changed, 140 insertions, 45 deletions
diff --git a/peer.go b/peer.go
index 24a5e97c9..80ddc5142 100644
--- a/peer.go
+++ b/peer.go
@@ -125,7 +125,8 @@ type Peer struct {
pubkey []byte
// Indicated whether the node is catching up or not
- catchingUp bool
+ catchingUp bool
+ blocksRequested int
Version string
}
@@ -135,15 +136,16 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
pubkey := ethutil.NewValueFromBytes(data).Get(2).Bytes()
return &Peer{
- outputQueue: make(chan *ethwire.Msg, outputBufferSize),
- quit: make(chan bool),
- ethereum: ethereum,
- conn: conn,
- inbound: inbound,
- disconnect: 0,
- connected: 1,
- port: 30303,
- pubkey: pubkey,
+ outputQueue: make(chan *ethwire.Msg, outputBufferSize),
+ quit: make(chan bool),
+ ethereum: ethereum,
+ conn: conn,
+ inbound: inbound,
+ disconnect: 0,
+ connected: 1,
+ port: 30303,
+ pubkey: pubkey,
+ blocksRequested: 10,
}
}
@@ -290,17 +292,69 @@ func (p *Peer) HandleInbound() {
// Get all blocks and process them
var block, lastBlock *ethchain.Block
var err error
+
+ // 1. Compare the first block over the wire's prev-hash with the hash of your last block
+ // 2. If these two values are the same you can just link the chains together.
+ // [1:0,2:1,3:2] <- Current blocks (format block:previous_block)
+ // [1:0,2:1,3:2,4:3,5:4] <- incoming blocks
+ // == [1,2,3,4,5]
+ // 3. If the values are not the same we will have to go back and calculate the chain with the highest total difficulty
+ // [1:0,2:1,3:2,11:3,12:11,13:12]
+ // [1:0,2:1,3:2,4:3,5:4,6:5]
+
+ // [3:2,11:3,12:11,13:12]
+ // [3:2,4:3,5:4,6:5]
+ // Heb ik dit blok?
+ // Nee: heb ik een blok met PrevHash 3?
+ // Ja: DIVERSION
+ // Nee; Adding to chain
+
+ // See if we can find a common ancestor
+ // 1. Get the earliest block in the package.
+ // 2. Do we have this block?
+ // 3. Yes: Let's continue what we are doing
+ // 4. No: Let's request more blocks back.
+
+ // Make sure we are actually receiving anything
+ if msg.Data.Len()-1 > 1 && p.catchingUp {
+ // We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find
+ // common ground to start syncing from
+ lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1))
+ if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) {
+ // If we can't find a common ancenstor we need to request more blocks.
+ // FIXME: At one point this won't scale anymore since we are not asking for an offset
+ // we just keep increasing the amount of blocks.
+ //fmt.Println("[PEER] No common ancestor found, requesting more blocks.")
+ p.blocksRequested = p.blocksRequested * 2
+ p.catchingUp = false
+ p.SyncWithBlocks()
+ }
+
+ for i := msg.Data.Len() - 1; i >= 0; i-- {
+ block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
+ // Do we have this block on our chain? If so we can continue
+ if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) {
+ // We don't have this block, but we do have a block with the same prevHash, diversion time!
+ if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) {
+ //ethutil.Config.Log.Infof("[PEER] Local and foreign chain have diverted after %x, finding best chain!\n", block.PrevHash)
+ if p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) {
+ return
+ }
+ }
+ }
+ }
+ }
+
for i := msg.Data.Len() - 1; i >= 0; i-- {
block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
p.ethereum.StateManager().PrepareDefault(block)
- err = p.ethereum.StateManager().ProcessBlock(block)
+ err = p.ethereum.StateManager().ProcessBlock(block, false)
if err != nil {
if ethutil.Config.Debug {
ethutil.Config.Log.Infof("[PEER] Block %x failed\n", block.Hash())
ethutil.Config.Log.Infof("[PEER] %v\n", err)
- ethutil.Config.Log.Infoln(block)
}
break
} else {
@@ -313,7 +367,7 @@ func (p *Peer) HandleInbound() {
if ethchain.IsParentErr(err) {
ethutil.Config.Log.Infoln("Attempting to catch up")
p.catchingUp = false
- p.CatchupWithPeer()
+ p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
} else if ethchain.IsValidationErr(err) {
// TODO
}
@@ -326,7 +380,7 @@ func (p *Peer) HandleInbound() {
ethutil.Config.Log.Infof("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
}
p.catchingUp = false
- p.CatchupWithPeer()
+ p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
}
}
case ethwire.MsgTxTy:
@@ -334,7 +388,8 @@ func (p *Peer) HandleInbound() {
// in the TxPool where it will undergo validation and
// processing when a new block is found
for i := 0; i < msg.Data.Len(); i++ {
- p.ethereum.TxPool().QueueTransaction(ethchain.NewTransactionFromData(msg.Data.Get(i).Encode()))
+ tx := ethchain.NewTransactionFromValue(msg.Data.Get(i))
+ p.ethereum.TxPool().QueueTransaction(tx)
}
case ethwire.MsgGetPeersTy:
// Flag this peer as a 'requested of new peers' this to
@@ -373,11 +428,11 @@ func (p *Peer) HandleInbound() {
// Amount of parents in the canonical chain
//amountOfBlocks := msg.Data.Get(l).AsUint()
amountOfBlocks := uint64(100)
+
// Check each SHA block hash from the message and determine whether
// the SHA is in the database
for i := 0; i < l; i++ {
- if data :=
- msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) {
+ if data := msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) {
parent = p.ethereum.BlockChain().GetBlock(data)
break
}
@@ -385,9 +440,14 @@ func (p *Peer) HandleInbound() {
// If a parent is found send back a reply
if parent != nil {
+ ethutil.Config.Log.Debugf("[PEER] Found conical block, returning chain from: %x ", parent.Hash())
chain := p.ethereum.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks)
- p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain))
+ if len(chain) > 0 {
+ ethutil.Config.Log.Debugf("[PEER] Returning %d blocks: %x ", len(chain), parent.Hash())
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain))
+ }
} else {
+ ethutil.Config.Log.Debugf("[PEER] Could not find a similar block")
// If no blocks are found we send back a reply with msg not in chain
// and the last hash from get chain
lastHash := msg.Data.Get(l - 1)
@@ -395,8 +455,18 @@ func (p *Peer) HandleInbound() {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()}))
}
case ethwire.MsgNotInChainTy:
- ethutil.Config.Log.Infof("Not in chain %x\n", msg.Data)
+ ethutil.Config.Log.Debugf("Not in chain %x\n", msg.Data)
// TODO
+ case ethwire.MsgGetTxsTy:
+ // Get the current transactions of the pool
+ txs := p.ethereum.TxPool().CurrentTransactions()
+ // Get the RlpData values from the txs
+ txsInterface := make([]interface{}, len(txs))
+ for i, tx := range txs {
+ txsInterface[i] = tx.RlpData()
+ }
+ // Broadcast it back to the peer
+ p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
// Unofficial but fun nonetheless
case ethwire.MsgTalkTy:
@@ -408,29 +478,6 @@ func (p *Peer) HandleInbound() {
p.Stop()
}
-func packAddr(address, port string) ([]interface{}, uint16) {
- addr := strings.Split(address, ".")
- a, _ := strconv.Atoi(addr[0])
- b, _ := strconv.Atoi(addr[1])
- c, _ := strconv.Atoi(addr[2])
- d, _ := strconv.Atoi(addr[3])
- host := []interface{}{int32(a), int32(b), int32(c), int32(d)}
- prt, _ := strconv.Atoi(port)
-
- return host, uint16(prt)
-}
-
-func unpackAddr(value *ethutil.Value, p uint64) string {
- a := strconv.Itoa(int(value.Get(0).Uint()))
- b := strconv.Itoa(int(value.Get(1).Uint()))
- c := strconv.Itoa(int(value.Get(2).Uint()))
- d := strconv.Itoa(int(value.Get(3).Uint()))
- host := strings.Join([]string{a, b, c, d}, ".")
- port := strconv.Itoa(int(p))
-
- return net.JoinHostPort(host, port)
-}
-
func (p *Peer) Start() {
peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String())
servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
@@ -526,7 +573,8 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
}
// Catch up with the connected peer
- p.CatchupWithPeer()
+ // p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
+ p.SyncWithBlocks()
// Set the peer's caps
p.caps = Caps(c.Get(3).Byte())
@@ -553,17 +601,64 @@ func (p *Peer) String() string {
return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.Version, p.caps)
}
+func (p *Peer) SyncWithBlocks() {
+ if !p.catchingUp {
+ p.catchingUp = true
+ // FIXME: THIS SHOULD NOT BE NEEDED
+ if p.blocksRequested == 0 {
+ p.blocksRequested = 10
+ }
+ blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
+
+ var hashes []interface{}
+ for _, block := range blocks {
+ hashes = append(hashes, block.Hash())
+ }
+
+ msgInfo := append(hashes, uint64(50))
+
+ msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
+ p.QueueMessage(msg)
+ }
+}
-func (p *Peer) CatchupWithPeer() {
+func (p *Peer) CatchupWithPeer(blockHash []byte) {
if !p.catchingUp {
p.catchingUp = true
- msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{p.ethereum.BlockChain().CurrentBlock.Hash(), uint64(50)})
+ msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(50)})
p.QueueMessage(msg)
ethutil.Config.Log.Debugf("Requesting blockchain %x...\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4])
+
+ msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
+ p.QueueMessage(msg)
+ ethutil.Config.Log.Debugln("Requested transactions")
}
}
func (p *Peer) RlpData() []interface{} {
return []interface{}{p.host, p.port, p.pubkey}
}
+
+func packAddr(address, port string) ([]interface{}, uint16) {
+ addr := strings.Split(address, ".")
+ a, _ := strconv.Atoi(addr[0])
+ b, _ := strconv.Atoi(addr[1])
+ c, _ := strconv.Atoi(addr[2])
+ d, _ := strconv.Atoi(addr[3])
+ host := []interface{}{int32(a), int32(b), int32(c), int32(d)}
+ prt, _ := strconv.Atoi(port)
+
+ return host, uint16(prt)
+}
+
+func unpackAddr(value *ethutil.Value, p uint64) string {
+ a := strconv.Itoa(int(value.Get(0).Uint()))
+ b := strconv.Itoa(int(value.Get(1).Uint()))
+ c := strconv.Itoa(int(value.Get(2).Uint()))
+ d := strconv.Itoa(int(value.Get(3).Uint()))
+ host := strings.Join([]string{a, b, c, d}, ".")
+ port := strconv.Itoa(int(p))
+
+ return net.JoinHostPort(host, port)
+}