diff options
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r-- | blockpool/peers.go | 169 |
1 files changed, 82 insertions, 87 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go index c6cade460..eb2ec6a1f 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -10,6 +10,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) // the blockpool's model of a peer @@ -106,9 +108,10 @@ func (self *peers) peerError(id string, code int, format string, params ...inter peer, ok := self.peers[id] self.lock.RUnlock() if ok { - peer.addError(code, format, params) + peer.addError(code, format, params...) + } else { + self.addToBlacklist(id) } - self.addToBlacklist(id) } // record time of offence in blacklist to implement suspension for PeerSuspensionInterval @@ -134,7 +137,11 @@ func (self *peers) suspended(id string) (s bool) { func (self *peer) addError(code int, format string, params ...interface{}) { err := self.errors.New(code, format, params...) self.peerError(err) - self.addToBlacklist(self.id) + if err.Fatal() { + self.addToBlacklist(self.id) + } else { + go self.bp.peers.removePeer(self.id, false) + } } // caller must hold peer lock @@ -143,7 +150,8 @@ func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) { defer self.lock.Unlock() if self.currentBlockHash != currentBlockHash { previousBlockHash := self.currentBlockHash - plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash)) + glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)", self.id, td, self.td, hex(currentBlockHash), hex(previousBlockHash)) + self.td = td self.currentBlockHash = currentBlockHash self.currentBlock = nil @@ -154,41 +162,30 @@ func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) { } func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) { - self.lock.Lock() - currentBlockC := self.currentBlockC - switchC := self.switchC hash := block.Hash() // this happens when block came in a newblock message but // also if sent in a blockmsg (for instance, if we requested, only if we // dont apply on blockrequests the restriction of flood control) currentBlockHash = self.currentBlockHash - if currentBlockHash == hash && self.currentBlock == nil { - // signal to head section process - plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash)) - td = self.td - } else { - plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash)) - } - self.lock.Unlock() - // this must be called without peerlock. - // peerlock held can halt the loop and block on select forever - if td != nil { - select { - case currentBlockC <- block: - case <-switchC: // peer is not best peer + if currentBlockHash == hash { + if self.currentBlock == nil { + // signal to head section process + glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash)) + td = self.td + } else { + glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash)) } } return } // this will use the TD given by the first peer to update peer td, this helps second best peer selection -// :FIXME: node func (self *peer) setChainInfoFromNode(n *node) { // in case best peer is lost block := n.block hash := block.Hash() if n.td != nil && n.td.Cmp(self.td) > 0 { - plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td) + glog.V(logger.Detail).Infof("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td) self.td = n.td self.currentBlockHash = block.Hash() self.parentHash = block.ParentHash() @@ -205,7 +202,7 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { peerCount := len(self.peers) // on first attempt use the best peer if attempts == 0 && self.best != nil { - plog.DebugDetailf("request %v missing blocks from best peer <%s>", len(hashes), self.best.id) + glog.V(logger.Detail).Infof("request %v missing blocks from best peer <%s>", len(hashes), self.best.id) self.best.requestBlocks(hashes) return } @@ -217,11 +214,11 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { indexes := rand.Perm(peerCount)[0:repetitions] sort.Ints(indexes) - plog.DebugDetailf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount) + glog.V(logger.Detail).Infof("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount) for _, peer := range self.peers { if i == indexes[0] { - plog.DebugDetailf("request length: %v", len(hashes)) - plog.DebugDetailf("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id) + glog.V(logger.Detail).Infof("request length: %v", len(hashes)) + glog.V(logger.Detail).Infof("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id) peer.requestBlocks(hashes) indexes = indexes[1:] if len(indexes) == 0 { @@ -248,7 +245,6 @@ func (self *peers) addPeer( self.lock.Lock() defer self.lock.Unlock() - var previousBlockHash common.Hash if self.suspended(id) { suspended = true @@ -259,7 +255,6 @@ func (self *peers) addPeer( // when called on an already connected peer, it means a newBlockMsg is received // peer head info is updated p.setChainInfo(td, currentBlockHash) - // FIXME: only count the same block once self.status.lock.Lock() self.status.values.NewBlocks++ self.status.lock.Unlock() @@ -272,25 +267,25 @@ func (self *peers) addPeer( self.status.values.NewBlocks++ self.status.lock.Unlock() - plog.Debugf("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash)) + glog.V(logger.Debug).Infof("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash)) } // check if peer's current head block is known if self.bp.hasBlock(currentBlockHash) { // peer not ahead - plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) + glog.V(logger.Debug).Infof("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) return false, false } if self.best == p { // new block update for active current best peer -> request hashes - plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) + glog.V(logger.Debug).Infof("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) if (previousBlockHash != common.Hash{}) { - plog.DebugDetailf("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash)) + glog.V(logger.Detail).Infof("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash)) p.headSectionC <- nil if entry := self.bp.get(previousBlockHash); entry != nil { - plog.DebugDetailf("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash)) + glog.V(logger.Detail).Infof("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash)) self.bp.activateChain(entry.section, p, p.switchC, nil) p.sections = append(p.sections, previousBlockHash) } @@ -309,7 +304,8 @@ func (self *peers) addPeer( self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) + glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) + // fmt.Printf("best peer %v - \n", bestpeer, id) self.bp.switchPeer(bestpeer, p) self.best = p best = true @@ -320,7 +316,7 @@ func (self *peers) addPeer( } // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects -func (self *peers) removePeer(id string) { +func (self *peers) removePeer(id string, del bool) { self.lock.Lock() defer self.lock.Unlock() @@ -328,10 +324,13 @@ func (self *peers) removePeer(id string) { if !found { return } + p.lock.Lock() + defer p.lock.Unlock() - delete(self.peers, id) - plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td) - + if del { + delete(self.peers, id) + glog.V(logger.Debug).Infof("addPeer: remove peer <%v> (td: %v)", id, p.td) + } // if current best peer is removed, need to find a better one if self.best == p { var newp *peer @@ -339,20 +338,29 @@ func (self *peers) removePeer(id string) { max := self.bp.getTD() // peer with the highest self-acclaimed TD is chosen for _, pp := range self.peers { + // demoted peer's td should be 0 + if pp.id == id { + pp.td = common.Big0 + pp.currentBlockHash = common.Hash{} + continue + } + pp.lock.RLock() if pp.td.Cmp(max) > 0 { max = pp.td newp = pp } + pp.lock.RUnlock() } if newp != nil { self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) + glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) } else { - plog.Warnln("addPeer: no suitable peers found") + glog.V(logger.Warn).Infof("addPeer: no suitable peers found") } self.best = newp + // fmt.Printf("remove peer %v - %v\n", p.id, newp) self.bp.switchPeer(p, newp) } } @@ -363,16 +371,17 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { // first quit AddBlockHashes, requestHeadSection and activateChain // by closing the old peer's switchC channel if oldp != nil { - plog.DebugDetailf("<%s> quit peer processes", oldp.id) + glog.V(logger.Detail).Infof("<%s> quit peer processes", oldp.id) + // fmt.Printf("close %v - %v\n", oldp.id, newp) close(oldp.switchC) } if newp != nil { - // newp.idleC = make(chan bool) - // newp.switchC = make(chan bool) // if new best peer has no head section yet, create it and run it // otherwise head section is an element of peer.sections + newp.idleC = make(chan bool) + newp.switchC = make(chan bool) if newp.headSection == nil { - plog.DebugDetailf("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash)) + glog.V(logger.Detail).Infof("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash)) if newp.idle { self.wg.Add(1) @@ -388,15 +397,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } }() - } else { - newp.idleC = make(chan bool) - newp.switchC = make(chan bool) } var connected = make(map[common.Hash]*section) var sections []common.Hash for _, hash := range newp.sections { - plog.DebugDetailf("activate chain starting from section [%s]", hex(hash)) + glog.V(logger.Detail).Infof("activate chain starting from section [%s]", hex(hash)) // if section not connected (ie, top of a contiguous sequence of sections) if connected[hash] == nil { // if not deleted, then reread from pool (it can be orphaned top half of a split section) @@ -407,7 +413,7 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } } } - plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections)) + glog.V(logger.Detail).Infof("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections)) // need to lock now that newp is exposed to section processesr newp.lock.Lock() newp.sections = sections @@ -416,7 +422,7 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { // finally deactivate section process for sections where newp didnt activate // newp activating section process changes the quit channel for this reason if oldp != nil { - plog.DebugDetailf("<%s> quit section processes", oldp.id) + glog.V(logger.Detail).Infof("<%s> quit section processes", oldp.id) close(oldp.idleC) } } @@ -438,7 +444,7 @@ func (self *peers) getPeer(id string) (p *peer, best bool) { func (self *peer) handleSection(sec *section) { self.lock.Lock() defer self.lock.Unlock() - plog.DebugDetailf("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec)) + glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec)) self.headSection = sec self.blockHashesRequestTimer = nil @@ -453,7 +459,7 @@ func (self *peer) handleSection(sec *section) { self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout) self.bestIdleTimer = nil - plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash)) } else { if !self.idle { self.idle = true @@ -462,12 +468,14 @@ func (self *peer) handleSection(sec *section) { self.headInfoTimer = nil self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout) - plog.DebugDetailf("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec)) } } func (self *peer) getCurrentBlock(currentBlock *types.Block) { // called by update or after AddBlock signals that head block of current peer is received + self.lock.Lock() + defer self.lock.Unlock() if currentBlock == nil { if entry := self.bp.get(self.currentBlockHash); entry != nil { entry.node.lock.Lock() @@ -475,22 +483,20 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) { entry.node.lock.Unlock() } if currentBlock != nil { - plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) } else { - plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) self.requestBlocks([]common.Hash{self.currentBlockHash}) self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval) return } } else { - plog.DebugDetailf("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash())) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash())) } - self.lock.Lock() - defer self.lock.Unlock() self.currentBlock = currentBlock self.parentHash = currentBlock.ParentHash() - plog.DebugDetailf("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash)) self.blockHashesRequestTimer = time.After(0) self.blocksRequestTimer = nil } @@ -500,7 +506,7 @@ func (self *peer) getBlockHashes() bool { defer self.lock.Unlock() //if connecting parent is found if self.bp.hasBlock(self.parentHash) { - plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash)) + glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash)) err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock})) self.bp.status.lock.Lock() @@ -510,16 +516,15 @@ func (self *peer) getBlockHashes() bool { self.addError(ErrInvalidBlock, "%v", err) self.bp.status.badPeers[self.id]++ } else { - /* @zelig: Commented out temp untill the rest of the network has been fixed. // XXX added currentBlock check (?) if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() { - plog.DebugDetailf("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td) + glog.V(logger.Detail).Infof("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td) if self.td.Cmp(self.currentBlock.Td) != 0 { - self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash) + self.addError(ErrIncorrectTD, "on block %x %v =?= %v", hex(self.parentHash), self.td, self.currentBlock.Td) self.bp.status.badPeers[self.id]++ } } - */ + headKey := self.parentHash height := self.bp.status.chain[headKey] + 1 self.bp.status.chain[self.currentBlockHash] = height @@ -532,21 +537,20 @@ func (self *peer) getBlockHashes() bool { } else { if parent := self.bp.get(self.parentHash); parent != nil { if self.bp.get(self.currentBlockHash) == nil { - plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash)) - n := &node{ - hash: self.currentBlockHash, - block: self.currentBlock, - hashBy: self.id, - blockBy: self.id, - td: self.td, + glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash)) + self.bp.nodeCacheLock.Lock() + n, ok := self.bp.nodeCache[self.currentBlockHash] + if !ok { + panic("not found in nodeCache") } + self.bp.nodeCacheLock.Unlock() self.bp.newSection([]*node{n}).activate(self) } else { - plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section)) + glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section)) self.bp.activateChain(parent.section, self, self.switchC, nil) } } else { - plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection)) + glog.V(logger.Detail).Infof("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection)) self.requestBlockHashes(self.currentBlockHash) self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval) return false @@ -565,15 +569,6 @@ func (self *peer) getBlockHashes() bool { // main loop for head section process func (self *peer) run() { - self.lock.Lock() - self.switchC = make(chan bool) - self.idleC = make(chan bool) - switchC := self.switchC - plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash)) - self.lock.Unlock() - - self.blockHashesRequestTimer = nil - self.blocksRequestTimer = time.After(0) self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout) self.bestIdleTimer = nil @@ -585,7 +580,7 @@ LOOP: select { // to minitor section process behaviour case <-ping.C: - plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) + glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) // signal from AddBlockHashes that head section for current best peer is created // if sec == nil, it signals that chain info has updated (new block message) @@ -614,12 +609,12 @@ LOOP: // there is no persistence here, so GC will just take care of cleaning up // signal for peer switch, quit - case <-switchC: + case <-self.switchC: var complete = "incomplete " if self.idle { complete = "complete" } - plog.Debugf("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete) + glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete) break LOOP // global quit for blockpool @@ -633,7 +628,7 @@ LOOP: self.bp.status.lock.Lock() self.bp.status.badPeers[self.id]++ self.bp.status.lock.Unlock() - plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) + glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection)) } } |