aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r--blockpool/peers.go169
1 files changed, 82 insertions, 87 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go
index c6cade460..eb2ec6a1f 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -10,6 +10,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
)
// the blockpool's model of a peer
@@ -106,9 +108,10 @@ func (self *peers) peerError(id string, code int, format string, params ...inter
peer, ok := self.peers[id]
self.lock.RUnlock()
if ok {
- peer.addError(code, format, params)
+ peer.addError(code, format, params...)
+ } else {
+ self.addToBlacklist(id)
}
- self.addToBlacklist(id)
}
// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
@@ -134,7 +137,11 @@ func (self *peers) suspended(id string) (s bool) {
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
- self.addToBlacklist(self.id)
+ if err.Fatal() {
+ self.addToBlacklist(self.id)
+ } else {
+ go self.bp.peers.removePeer(self.id, false)
+ }
}
// caller must hold peer lock
@@ -143,7 +150,8 @@ func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
defer self.lock.Unlock()
if self.currentBlockHash != currentBlockHash {
previousBlockHash := self.currentBlockHash
- plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash))
+ glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)", self.id, td, self.td, hex(currentBlockHash), hex(previousBlockHash))
+
self.td = td
self.currentBlockHash = currentBlockHash
self.currentBlock = nil
@@ -154,41 +162,30 @@ func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
}
func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
- self.lock.Lock()
- currentBlockC := self.currentBlockC
- switchC := self.switchC
hash := block.Hash()
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
currentBlockHash = self.currentBlockHash
- if currentBlockHash == hash && self.currentBlock == nil {
- // signal to head section process
- plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
- td = self.td
- } else {
- plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
- }
- self.lock.Unlock()
- // this must be called without peerlock.
- // peerlock held can halt the loop and block on select forever
- if td != nil {
- select {
- case currentBlockC <- block:
- case <-switchC: // peer is not best peer
+ if currentBlockHash == hash {
+ if self.currentBlock == nil {
+ // signal to head section process
+ glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
+ td = self.td
+ } else {
+ glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
}
}
return
}
// this will use the TD given by the first peer to update peer td, this helps second best peer selection
-// :FIXME: node
func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost
block := n.block
hash := block.Hash()
if n.td != nil && n.td.Cmp(self.td) > 0 {
- plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
+ glog.V(logger.Detail).Infof("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
self.td = n.td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
@@ -205,7 +202,7 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
peerCount := len(self.peers)
// on first attempt use the best peer
if attempts == 0 && self.best != nil {
- plog.DebugDetailf("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
+ glog.V(logger.Detail).Infof("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
self.best.requestBlocks(hashes)
return
}
@@ -217,11 +214,11 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
indexes := rand.Perm(peerCount)[0:repetitions]
sort.Ints(indexes)
- plog.DebugDetailf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
+ glog.V(logger.Detail).Infof("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
for _, peer := range self.peers {
if i == indexes[0] {
- plog.DebugDetailf("request length: %v", len(hashes))
- plog.DebugDetailf("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
+ glog.V(logger.Detail).Infof("request length: %v", len(hashes))
+ glog.V(logger.Detail).Infof("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
peer.requestBlocks(hashes)
indexes = indexes[1:]
if len(indexes) == 0 {
@@ -248,7 +245,6 @@ func (self *peers) addPeer(
self.lock.Lock()
defer self.lock.Unlock()
-
var previousBlockHash common.Hash
if self.suspended(id) {
suspended = true
@@ -259,7 +255,6 @@ func (self *peers) addPeer(
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
p.setChainInfo(td, currentBlockHash)
- // FIXME: only count the same block once
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
@@ -272,25 +267,25 @@ func (self *peers) addPeer(
self.status.values.NewBlocks++
self.status.lock.Unlock()
- plog.Debugf("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
+ glog.V(logger.Debug).Infof("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
}
// check if peer's current head block is known
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
- plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
+ glog.V(logger.Debug).Infof("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
return false, false
}
if self.best == p {
// new block update for active current best peer -> request hashes
- plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
+ glog.V(logger.Debug).Infof("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
if (previousBlockHash != common.Hash{}) {
- plog.DebugDetailf("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash))
+ glog.V(logger.Detail).Infof("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash))
p.headSectionC <- nil
if entry := self.bp.get(previousBlockHash); entry != nil {
- plog.DebugDetailf("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash))
+ glog.V(logger.Detail).Infof("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash))
self.bp.activateChain(entry.section, p, p.switchC, nil)
p.sections = append(p.sections, previousBlockHash)
}
@@ -309,7 +304,8 @@ func (self *peers) addPeer(
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
- plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
+ glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
+ // fmt.Printf("best peer %v - \n", bestpeer, id)
self.bp.switchPeer(bestpeer, p)
self.best = p
best = true
@@ -320,7 +316,7 @@ func (self *peers) addPeer(
}
// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
-func (self *peers) removePeer(id string) {
+func (self *peers) removePeer(id string, del bool) {
self.lock.Lock()
defer self.lock.Unlock()
@@ -328,10 +324,13 @@ func (self *peers) removePeer(id string) {
if !found {
return
}
+ p.lock.Lock()
+ defer p.lock.Unlock()
- delete(self.peers, id)
- plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td)
-
+ if del {
+ delete(self.peers, id)
+ glog.V(logger.Debug).Infof("addPeer: remove peer <%v> (td: %v)", id, p.td)
+ }
// if current best peer is removed, need to find a better one
if self.best == p {
var newp *peer
@@ -339,20 +338,29 @@ func (self *peers) removePeer(id string) {
max := self.bp.getTD()
// peer with the highest self-acclaimed TD is chosen
for _, pp := range self.peers {
+ // demoted peer's td should be 0
+ if pp.id == id {
+ pp.td = common.Big0
+ pp.currentBlockHash = common.Hash{}
+ continue
+ }
+ pp.lock.RLock()
if pp.td.Cmp(max) > 0 {
max = pp.td
newp = pp
}
+ pp.lock.RUnlock()
}
if newp != nil {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
- plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
+ glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
} else {
- plog.Warnln("addPeer: no suitable peers found")
+ glog.V(logger.Warn).Infof("addPeer: no suitable peers found")
}
self.best = newp
+ // fmt.Printf("remove peer %v - %v\n", p.id, newp)
self.bp.switchPeer(p, newp)
}
}
@@ -363,16 +371,17 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain
// by closing the old peer's switchC channel
if oldp != nil {
- plog.DebugDetailf("<%s> quit peer processes", oldp.id)
+ glog.V(logger.Detail).Infof("<%s> quit peer processes", oldp.id)
+ // fmt.Printf("close %v - %v\n", oldp.id, newp)
close(oldp.switchC)
}
if newp != nil {
- // newp.idleC = make(chan bool)
- // newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
+ newp.idleC = make(chan bool)
+ newp.switchC = make(chan bool)
if newp.headSection == nil {
- plog.DebugDetailf("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))
+ glog.V(logger.Detail).Infof("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))
if newp.idle {
self.wg.Add(1)
@@ -388,15 +397,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
}()
- } else {
- newp.idleC = make(chan bool)
- newp.switchC = make(chan bool)
}
var connected = make(map[common.Hash]*section)
var sections []common.Hash
for _, hash := range newp.sections {
- plog.DebugDetailf("activate chain starting from section [%s]", hex(hash))
+ glog.V(logger.Detail).Infof("activate chain starting from section [%s]", hex(hash))
// if section not connected (ie, top of a contiguous sequence of sections)
if connected[hash] == nil {
// if not deleted, then reread from pool (it can be orphaned top half of a split section)
@@ -407,7 +413,7 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
}
}
- plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
+ glog.V(logger.Detail).Infof("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
// need to lock now that newp is exposed to section processesr
newp.lock.Lock()
newp.sections = sections
@@ -416,7 +422,7 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
// finally deactivate section process for sections where newp didnt activate
// newp activating section process changes the quit channel for this reason
if oldp != nil {
- plog.DebugDetailf("<%s> quit section processes", oldp.id)
+ glog.V(logger.Detail).Infof("<%s> quit section processes", oldp.id)
close(oldp.idleC)
}
}
@@ -438,7 +444,7 @@ func (self *peers) getPeer(id string) (p *peer, best bool) {
func (self *peer) handleSection(sec *section) {
self.lock.Lock()
defer self.lock.Unlock()
- plog.DebugDetailf("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec))
self.headSection = sec
self.blockHashesRequestTimer = nil
@@ -453,7 +459,7 @@ func (self *peer) handleSection(sec *section) {
self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout)
self.bestIdleTimer = nil
- plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
} else {
if !self.idle {
self.idle = true
@@ -462,12 +468,14 @@ func (self *peer) handleSection(sec *section) {
self.headInfoTimer = nil
self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout)
- plog.DebugDetailf("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec))
}
}
func (self *peer) getCurrentBlock(currentBlock *types.Block) {
// called by update or after AddBlock signals that head block of current peer is received
+ self.lock.Lock()
+ defer self.lock.Unlock()
if currentBlock == nil {
if entry := self.bp.get(self.currentBlockHash); entry != nil {
entry.node.lock.Lock()
@@ -475,22 +483,20 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) {
entry.node.lock.Unlock()
}
if currentBlock != nil {
- plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
} else {
- plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
self.requestBlocks([]common.Hash{self.currentBlockHash})
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
return
}
} else {
- plog.DebugDetailf("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
}
- self.lock.Lock()
- defer self.lock.Unlock()
self.currentBlock = currentBlock
self.parentHash = currentBlock.ParentHash()
- plog.DebugDetailf("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
self.blockHashesRequestTimer = time.After(0)
self.blocksRequestTimer = nil
}
@@ -500,7 +506,7 @@ func (self *peer) getBlockHashes() bool {
defer self.lock.Unlock()
//if connecting parent is found
if self.bp.hasBlock(self.parentHash) {
- plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock}))
self.bp.status.lock.Lock()
@@ -510,16 +516,15 @@ func (self *peer) getBlockHashes() bool {
self.addError(ErrInvalidBlock, "%v", err)
self.bp.status.badPeers[self.id]++
} else {
- /* @zelig: Commented out temp untill the rest of the network has been fixed.
// XXX added currentBlock check (?)
if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() {
- plog.DebugDetailf("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td)
+ glog.V(logger.Detail).Infof("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td)
if self.td.Cmp(self.currentBlock.Td) != 0 {
- self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash)
+ self.addError(ErrIncorrectTD, "on block %x %v =?= %v", hex(self.parentHash), self.td, self.currentBlock.Td)
self.bp.status.badPeers[self.id]++
}
}
- */
+
headKey := self.parentHash
height := self.bp.status.chain[headKey] + 1
self.bp.status.chain[self.currentBlockHash] = height
@@ -532,21 +537,20 @@ func (self *peer) getBlockHashes() bool {
} else {
if parent := self.bp.get(self.parentHash); parent != nil {
if self.bp.get(self.currentBlockHash) == nil {
- plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
- n := &node{
- hash: self.currentBlockHash,
- block: self.currentBlock,
- hashBy: self.id,
- blockBy: self.id,
- td: self.td,
+ glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
+ self.bp.nodeCacheLock.Lock()
+ n, ok := self.bp.nodeCache[self.currentBlockHash]
+ if !ok {
+ panic("not found in nodeCache")
}
+ self.bp.nodeCacheLock.Unlock()
self.bp.newSection([]*node{n}).activate(self)
} else {
- plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
self.bp.activateChain(parent.section, self, self.switchC, nil)
}
} else {
- plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
self.requestBlockHashes(self.currentBlockHash)
self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
return false
@@ -565,15 +569,6 @@ func (self *peer) getBlockHashes() bool {
// main loop for head section process
func (self *peer) run() {
- self.lock.Lock()
- self.switchC = make(chan bool)
- self.idleC = make(chan bool)
- switchC := self.switchC
- plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
- self.lock.Unlock()
-
- self.blockHashesRequestTimer = nil
-
self.blocksRequestTimer = time.After(0)
self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout)
self.bestIdleTimer = nil
@@ -585,7 +580,7 @@ LOOP:
select {
// to minitor section process behaviour
case <-ping.C:
- plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
+ glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
// signal from AddBlockHashes that head section for current best peer is created
// if sec == nil, it signals that chain info has updated (new block message)
@@ -614,12 +609,12 @@ LOOP:
// there is no persistence here, so GC will just take care of cleaning up
// signal for peer switch, quit
- case <-switchC:
+ case <-self.switchC:
var complete = "incomplete "
if self.idle {
complete = "complete"
}
- plog.Debugf("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
+ glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
break LOOP
// global quit for blockpool
@@ -633,7 +628,7 @@ LOOP:
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
- plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
+ glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
}
}