aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r--blockpool/peers.go639
1 files changed, 0 insertions, 639 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go
deleted file mode 100644
index eb2ec6a1f..000000000
--- a/blockpool/peers.go
+++ /dev/null
@@ -1,639 +0,0 @@
-package blockpool
-
-import (
- "math/big"
- "math/rand"
- "sort"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/errs"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
-)
-
-// the blockpool's model of a peer
-type peer struct {
- lock sync.RWMutex
-
- // last known blockchain status
- td *big.Int
- tdAdvertised bool
- currentBlockHash common.Hash
- currentBlock *types.Block
- parentHash common.Hash
- headSection *section
-
- id string
-
- // peer callbacks
- requestBlockHashes func(common.Hash) error
- requestBlocks func([]common.Hash) error
- peerError func(*errs.Error)
- errors *errs.Errors
-
- sections []common.Hash
-
- // channels to push new head block and head section for peer a
- currentBlockC chan *types.Block
- headSectionC chan *section
-
- // channels to signal peer switch and peer quit to section processes
- idleC chan bool
- switchC chan bool
-
- bp *BlockPool
-
- // timers for head section process
- blockHashesRequestTimer <-chan time.Time
- blocksRequestTimer <-chan time.Time
- headInfoTimer <-chan time.Time
- bestIdleTimer <-chan time.Time
-
- addToBlacklist func(id string)
-
- idle bool
-}
-
-// peers is the component keeping a record of peers in a hashmap
-//
-type peers struct {
- lock sync.RWMutex
- bllock sync.Mutex
-
- bp *BlockPool
- errors *errs.Errors
- peers map[string]*peer
- best *peer
- status *status
- blacklist map[string]time.Time
-}
-
-// peer constructor
-func (self *peers) newPeer(
- td *big.Int,
- currentBlockHash common.Hash,
- id string,
- requestBlockHashes func(common.Hash) error,
- requestBlocks func([]common.Hash) error,
- peerError func(*errs.Error),
-) (p *peer) {
-
- p = &peer{
- errors: self.errors,
- td: td,
- currentBlockHash: currentBlockHash,
- id: id,
- requestBlockHashes: requestBlockHashes,
- requestBlocks: requestBlocks,
- peerError: peerError,
- currentBlockC: make(chan *types.Block),
- headSectionC: make(chan *section),
- switchC: make(chan bool),
- bp: self.bp,
- idle: true,
- addToBlacklist: self.addToBlacklist,
- }
- close(p.switchC) //! hack :((((
- // at creation the peer is recorded in the peer pool
- self.peers[id] = p
- return
-}
-
-// dispatches an error to a peer if still connected, adds it to the blacklist
-func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
- self.lock.RLock()
- peer, ok := self.peers[id]
- self.lock.RUnlock()
- if ok {
- peer.addError(code, format, params...)
- } else {
- self.addToBlacklist(id)
- }
-}
-
-// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
-func (self *peers) addToBlacklist(id string) {
- self.bllock.Lock()
- defer self.bllock.Unlock()
- self.blacklist[id] = time.Now()
-}
-
-// suspended checks if peer is still suspended, caller should hold peers.lock
-func (self *peers) suspended(id string) (s bool) {
- self.bllock.Lock()
- defer self.bllock.Unlock()
- if suspendedAt, ok := self.blacklist[id]; ok {
- if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
- // no longer suspended, delete entry
- delete(self.blacklist, id)
- }
- }
- return
-}
-
-func (self *peer) addError(code int, format string, params ...interface{}) {
- err := self.errors.New(code, format, params...)
- self.peerError(err)
- if err.Fatal() {
- self.addToBlacklist(self.id)
- } else {
- go self.bp.peers.removePeer(self.id, false)
- }
-}
-
-// caller must hold peer lock
-func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
- self.lock.Lock()
- defer self.lock.Unlock()
- if self.currentBlockHash != currentBlockHash {
- previousBlockHash := self.currentBlockHash
- glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)", self.id, td, self.td, hex(currentBlockHash), hex(previousBlockHash))
-
- self.td = td
- self.currentBlockHash = currentBlockHash
- self.currentBlock = nil
- self.parentHash = common.Hash{}
- self.headSection = nil
- }
- self.tdAdvertised = true
-}
-
-func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
- hash := block.Hash()
- // this happens when block came in a newblock message but
- // also if sent in a blockmsg (for instance, if we requested, only if we
- // dont apply on blockrequests the restriction of flood control)
- currentBlockHash = self.currentBlockHash
- if currentBlockHash == hash {
- if self.currentBlock == nil {
- // signal to head section process
- glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
- td = self.td
- } else {
- glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
- }
- }
- return
-}
-
-// this will use the TD given by the first peer to update peer td, this helps second best peer selection
-func (self *peer) setChainInfoFromNode(n *node) {
- // in case best peer is lost
- block := n.block
- hash := block.Hash()
- if n.td != nil && n.td.Cmp(self.td) > 0 {
- glog.V(logger.Detail).Infof("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
- self.td = n.td
- self.currentBlockHash = block.Hash()
- self.parentHash = block.ParentHash()
- self.currentBlock = block
- self.headSection = nil
- }
-}
-
-// distribute block request among known peers
-func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
- self.lock.RLock()
-
- defer self.lock.RUnlock()
- peerCount := len(self.peers)
- // on first attempt use the best peer
- if attempts == 0 && self.best != nil {
- glog.V(logger.Detail).Infof("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
- self.best.requestBlocks(hashes)
- return
- }
- repetitions := self.bp.Config.BlocksRequestRepetition
- if repetitions > peerCount {
- repetitions = peerCount
- }
- i := 0
- indexes := rand.Perm(peerCount)[0:repetitions]
- sort.Ints(indexes)
-
- glog.V(logger.Detail).Infof("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
- for _, peer := range self.peers {
- if i == indexes[0] {
- glog.V(logger.Detail).Infof("request length: %v", len(hashes))
- glog.V(logger.Detail).Infof("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
- peer.requestBlocks(hashes)
- indexes = indexes[1:]
- if len(indexes) == 0 {
- break
- }
- }
- i++
- }
- self.bp.putHashSlice(hashes)
-}
-
-// addPeer implements the logic for blockpool.AddPeer
-// returns 2 bool values
-// 1. true iff peer is promoted as best peer in the pool
-// 2. true iff peer is still suspended
-func (self *peers) addPeer(
- td *big.Int,
- currentBlockHash common.Hash,
- id string,
- requestBlockHashes func(common.Hash) error,
- requestBlocks func([]common.Hash) error,
- peerError func(*errs.Error),
-) (best bool, suspended bool) {
-
- self.lock.Lock()
- defer self.lock.Unlock()
- var previousBlockHash common.Hash
- if self.suspended(id) {
- suspended = true
- return
- }
- p, found := self.peers[id]
- if found {
- // when called on an already connected peer, it means a newBlockMsg is received
- // peer head info is updated
- p.setChainInfo(td, currentBlockHash)
- self.status.lock.Lock()
- self.status.values.NewBlocks++
- self.status.lock.Unlock()
- } else {
- p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
-
- self.status.lock.Lock()
-
- self.status.peers[id]++
- self.status.values.NewBlocks++
- self.status.lock.Unlock()
-
- glog.V(logger.Debug).Infof("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
- }
-
- // check if peer's current head block is known
- if self.bp.hasBlock(currentBlockHash) {
- // peer not ahead
- glog.V(logger.Debug).Infof("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
- return false, false
- }
-
- if self.best == p {
- // new block update for active current best peer -> request hashes
- glog.V(logger.Debug).Infof("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
-
- if (previousBlockHash != common.Hash{}) {
- glog.V(logger.Detail).Infof("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash))
- p.headSectionC <- nil
- if entry := self.bp.get(previousBlockHash); entry != nil {
- glog.V(logger.Detail).Infof("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash))
- self.bp.activateChain(entry.section, p, p.switchC, nil)
- p.sections = append(p.sections, previousBlockHash)
- }
- }
- best = true
- } else {
- // baseline is our own TD
- currentTD := self.bp.getTD()
- bestpeer := self.best
- if bestpeer != nil {
- bestpeer.lock.RLock()
- defer bestpeer.lock.RUnlock()
- currentTD = self.best.td
- }
- if td.Cmp(currentTD) > 0 {
- self.status.lock.Lock()
- self.status.bestPeers[p.id]++
- self.status.lock.Unlock()
- glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
- // fmt.Printf("best peer %v - \n", bestpeer, id)
- self.bp.switchPeer(bestpeer, p)
- self.best = p
- best = true
- }
- }
-
- return
-}
-
-// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
-func (self *peers) removePeer(id string, del bool) {
- self.lock.Lock()
- defer self.lock.Unlock()
-
- p, found := self.peers[id]
- if !found {
- return
- }
- p.lock.Lock()
- defer p.lock.Unlock()
-
- if del {
- delete(self.peers, id)
- glog.V(logger.Debug).Infof("addPeer: remove peer <%v> (td: %v)", id, p.td)
- }
- // if current best peer is removed, need to find a better one
- if self.best == p {
- var newp *peer
- // only peers that are ahead of us are considered
- max := self.bp.getTD()
- // peer with the highest self-acclaimed TD is chosen
- for _, pp := range self.peers {
- // demoted peer's td should be 0
- if pp.id == id {
- pp.td = common.Big0
- pp.currentBlockHash = common.Hash{}
- continue
- }
- pp.lock.RLock()
- if pp.td.Cmp(max) > 0 {
- max = pp.td
- newp = pp
- }
- pp.lock.RUnlock()
- }
- if newp != nil {
- self.status.lock.Lock()
- self.status.bestPeers[p.id]++
- self.status.lock.Unlock()
- glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
- } else {
- glog.V(logger.Warn).Infof("addPeer: no suitable peers found")
- }
- self.best = newp
- // fmt.Printf("remove peer %v - %v\n", p.id, newp)
- self.bp.switchPeer(p, newp)
- }
-}
-
-// switchPeer launches section processes
-func (self *BlockPool) switchPeer(oldp, newp *peer) {
-
- // first quit AddBlockHashes, requestHeadSection and activateChain
- // by closing the old peer's switchC channel
- if oldp != nil {
- glog.V(logger.Detail).Infof("<%s> quit peer processes", oldp.id)
- // fmt.Printf("close %v - %v\n", oldp.id, newp)
- close(oldp.switchC)
- }
- if newp != nil {
- // if new best peer has no head section yet, create it and run it
- // otherwise head section is an element of peer.sections
- newp.idleC = make(chan bool)
- newp.switchC = make(chan bool)
- if newp.headSection == nil {
- glog.V(logger.Detail).Infof("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))
-
- if newp.idle {
- self.wg.Add(1)
- newp.idle = false
- self.syncing()
- }
-
- go func() {
- newp.run()
- if !newp.idle {
- self.wg.Done()
- newp.idle = true
- }
- }()
-
- }
-
- var connected = make(map[common.Hash]*section)
- var sections []common.Hash
- for _, hash := range newp.sections {
- glog.V(logger.Detail).Infof("activate chain starting from section [%s]", hex(hash))
- // if section not connected (ie, top of a contiguous sequence of sections)
- if connected[hash] == nil {
- // if not deleted, then reread from pool (it can be orphaned top half of a split section)
- if entry := self.get(hash); entry != nil {
- self.activateChain(entry.section, newp, newp.switchC, connected)
- connected[hash] = entry.section
- sections = append(sections, hash)
- }
- }
- }
- glog.V(logger.Detail).Infof("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
- // need to lock now that newp is exposed to section processesr
- newp.lock.Lock()
- newp.sections = sections
- newp.lock.Unlock()
- }
- // finally deactivate section process for sections where newp didnt activate
- // newp activating section process changes the quit channel for this reason
- if oldp != nil {
- glog.V(logger.Detail).Infof("<%s> quit section processes", oldp.id)
- close(oldp.idleC)
- }
-}
-
-// getPeer looks up peer by id, returns peer and a bool value
-// that is true iff peer is current best peer
-func (self *peers) getPeer(id string) (p *peer, best bool) {
- self.lock.RLock()
- defer self.lock.RUnlock()
- if self.best != nil && self.best.id == id {
- return self.best, true
- }
- p = self.peers[id]
- return
-}
-
-// head section process
-
-func (self *peer) handleSection(sec *section) {
- self.lock.Lock()
- defer self.lock.Unlock()
- glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec))
-
- self.headSection = sec
- self.blockHashesRequestTimer = nil
-
- if sec == nil {
- if self.idle {
- self.idle = false
- self.bp.wg.Add(1)
- self.bp.syncing()
- }
-
- self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout)
- self.bestIdleTimer = nil
-
- glog.V(logger.Detail).Infof("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
- } else {
- if !self.idle {
- self.idle = true
- self.bp.wg.Done()
- }
-
- self.headInfoTimer = nil
- self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout)
- glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec))
- }
-}
-
-func (self *peer) getCurrentBlock(currentBlock *types.Block) {
- // called by update or after AddBlock signals that head block of current peer is received
- self.lock.Lock()
- defer self.lock.Unlock()
- if currentBlock == nil {
- if entry := self.bp.get(self.currentBlockHash); entry != nil {
- entry.node.lock.Lock()
- currentBlock = entry.node.block
- entry.node.lock.Unlock()
- }
- if currentBlock != nil {
- glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
- } else {
- glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
- self.requestBlocks([]common.Hash{self.currentBlockHash})
- self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
- return
- }
- } else {
- glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
- }
-
- self.currentBlock = currentBlock
- self.parentHash = currentBlock.ParentHash()
- glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
- self.blockHashesRequestTimer = time.After(0)
- self.blocksRequestTimer = nil
-}
-
-func (self *peer) getBlockHashes() bool {
- self.lock.Lock()
- defer self.lock.Unlock()
- //if connecting parent is found
- if self.bp.hasBlock(self.parentHash) {
- glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
- err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock}))
-
- self.bp.status.lock.Lock()
- self.bp.status.values.BlocksInChain++
- self.bp.status.values.BlocksInPool--
- if err != nil {
- self.addError(ErrInvalidBlock, "%v", err)
- self.bp.status.badPeers[self.id]++
- } else {
- // XXX added currentBlock check (?)
- if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() {
- glog.V(logger.Detail).Infof("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td)
- if self.td.Cmp(self.currentBlock.Td) != 0 {
- self.addError(ErrIncorrectTD, "on block %x %v =?= %v", hex(self.parentHash), self.td, self.currentBlock.Td)
- self.bp.status.badPeers[self.id]++
- }
- }
-
- headKey := self.parentHash
- height := self.bp.status.chain[headKey] + 1
- self.bp.status.chain[self.currentBlockHash] = height
- if height > self.bp.status.values.LongestChain {
- self.bp.status.values.LongestChain = height
- }
- delete(self.bp.status.chain, headKey)
- }
- self.bp.status.lock.Unlock()
- } else {
- if parent := self.bp.get(self.parentHash); parent != nil {
- if self.bp.get(self.currentBlockHash) == nil {
- glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
- self.bp.nodeCacheLock.Lock()
- n, ok := self.bp.nodeCache[self.currentBlockHash]
- if !ok {
- panic("not found in nodeCache")
- }
- self.bp.nodeCacheLock.Unlock()
- self.bp.newSection([]*node{n}).activate(self)
- } else {
- glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
- self.bp.activateChain(parent.section, self, self.switchC, nil)
- }
- } else {
- glog.V(logger.Detail).Infof("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
- self.requestBlockHashes(self.currentBlockHash)
- self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
- return false
- }
- }
- self.blockHashesRequestTimer = nil
- if !self.idle {
- self.idle = true
- self.headInfoTimer = nil
- self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout)
- self.bp.wg.Done()
- }
- return true
-}
-
-// main loop for head section process
-func (self *peer) run() {
-
- self.blocksRequestTimer = time.After(0)
- self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout)
- self.bestIdleTimer = nil
-
- var ping = time.NewTicker(5 * time.Second)
-
-LOOP:
- for {
- select {
- // to minitor section process behaviour
- case <-ping.C:
- glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
-
- // signal from AddBlockHashes that head section for current best peer is created
- // if sec == nil, it signals that chain info has updated (new block message)
- case sec := <-self.headSectionC:
- self.handleSection(sec)
-
- // periodic check for block hashes or parent block/section
- case <-self.blockHashesRequestTimer:
- self.getBlockHashes()
-
- // signal from AddBlock that head block of current best peer has been received
- case currentBlock := <-self.currentBlockC:
- self.getCurrentBlock(currentBlock)
-
- // keep requesting until found or timed out
- case <-self.blocksRequestTimer:
- self.getCurrentBlock(nil)
-
- // quitting on timeout
- case <-self.headInfoTimer:
- self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block (td: %v, head: %s)", self.td, hex(self.currentBlockHash)))
-
- self.bp.status.lock.Lock()
- self.bp.status.badPeers[self.id]++
- self.bp.status.lock.Unlock()
- // there is no persistence here, so GC will just take care of cleaning up
-
- // signal for peer switch, quit
- case <-self.switchC:
- var complete = "incomplete "
- if self.idle {
- complete = "complete"
- }
- glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
- break LOOP
-
- // global quit for blockpool
- case <-self.bp.quit:
- break LOOP
-
- // best
- case <-self.bestIdleTimer:
- self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash)))
-
- self.bp.status.lock.Lock()
- self.bp.status.badPeers[self.id]++
- self.bp.status.lock.Unlock()
- glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
- }
- }
-
- if !self.idle {
- self.idle = true
- self.bp.wg.Done()
- }
-}