aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r--blockpool/peers.go54
1 files changed, 28 insertions, 26 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go
index 3f514c9e9..e5d6a16d6 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -57,7 +57,8 @@ type peer struct {
// peers is the component keeping a record of peers in a hashmap
//
type peers struct {
- lock sync.RWMutex
+ lock sync.RWMutex
+ bllock sync.Mutex
bp *BlockPool
errors *errs.Errors
@@ -109,15 +110,15 @@ func (self *peers) peerError(id string, code int, format string, params ...inter
// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
func (self *peers) addToBlacklist(id string) {
- self.lock.Lock()
- defer self.lock.Unlock()
+ self.bllock.Lock()
+ defer self.bllock.Unlock()
self.blacklist[id] = time.Now()
}
-// suspended checks if peer is still suspended
+// suspended checks if peer is still suspended, caller should hold peers.lock
func (self *peers) suspended(id string) (s bool) {
- self.lock.Lock()
- defer self.lock.Unlock()
+ self.bllock.Lock()
+ defer self.bllock.Unlock()
if suspendedAt, ok := self.blacklist[id]; ok {
if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
// no longer suspended, delete entry
@@ -142,9 +143,8 @@ func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
self.headSection = nil
}
+// caller must hold peer lock
func (self *peer) setChainInfoFromBlock(block *types.Block) {
- self.lock.Lock()
- defer self.lock.Unlock()
// use the optional TD to update peer td, this helps second best peer selection
// in case best peer is lost
if block.Td != nil && block.Td.Cmp(self.td) > 0 {
@@ -155,16 +155,12 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) {
self.currentBlock = block
self.headSection = nil
}
- self.bp.wg.Add(1)
- go func() {
- self.currentBlockC <- block
- self.bp.wg.Done()
- }()
}
// distribute block request among known peers
func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
self.lock.RLock()
+
defer self.lock.RUnlock()
peerCount := len(self.peers)
// on first attempt use the best peer
@@ -210,13 +206,14 @@ func (self *peers) addPeer(
peerError func(*errs.Error),
) (best bool, suspended bool) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
var previousBlockHash common.Hash
if self.suspended(id) {
suspended = true
return
}
- self.lock.Lock()
- defer self.lock.Unlock()
p, found := self.peers[id]
if found {
// when called on an already connected peer, it means a newBlockMsg is received
@@ -260,7 +257,7 @@ func (self *peers) addPeer(
p.headSectionC <- nil
if entry := self.bp.get(previousBlockHash); entry != nil {
plog.DebugDetailf("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash))
- self.bp.activateChain(entry.section, p, nil)
+ self.bp.activateChain(entry.section, p, p.switchC, nil)
p.sections = append(p.sections, previousBlockHash)
}
}
@@ -270,8 +267,8 @@ func (self *peers) addPeer(
currentTD := self.bp.getTD()
bestpeer := self.best
if bestpeer != nil {
- bestpeer.lock.Lock()
- defer bestpeer.lock.Unlock()
+ bestpeer.lock.RLock()
+ defer bestpeer.lock.RUnlock()
currentTD = self.best.td
}
if td.Cmp(currentTD) > 0 {
@@ -367,14 +364,14 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
if connected[hash] == nil {
// if not deleted, then reread from pool (it can be orphaned top half of a split section)
if entry := self.get(hash); entry != nil {
- self.activateChain(entry.section, newp, connected)
+ self.activateChain(entry.section, newp, newp.switchC, connected)
connected[hash] = entry.section
sections = append(sections, hash)
}
}
}
plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
- // need to lock now that newp is exposed to section processes
+ // need to lock now that newp is exposed to section processesr
newp.lock.Lock()
newp.sections = sections
newp.lock.Unlock()
@@ -462,6 +459,8 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) {
}
func (self *peer) getBlockHashes() bool {
+ self.lock.Lock()
+ defer self.lock.Unlock()
//if connecting parent is found
if self.bp.hasBlock(self.parentHash) {
plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
@@ -474,13 +473,16 @@ func (self *peer) getBlockHashes() bool {
self.addError(ErrInvalidBlock, "%v", err)
self.bp.status.badPeers[self.id]++
} else {
+ /* @zelig: Commented out temp untill the rest of the network has been fixed.
// XXX added currentBlock check (?)
- if self.currentBlock != nil && self.currentBlock.Td != nil {
+ if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() {
+ plog.DebugDetailf("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td)
if self.td.Cmp(self.currentBlock.Td) != 0 {
- //self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash)
- //self.bp.status.badPeers[self.id]++
+ self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash)
+ self.bp.status.badPeers[self.id]++
}
}
+ */
headKey := self.parentHash
height := self.bp.status.chain[headKey] + 1
self.bp.status.chain[self.currentBlockHash] = height
@@ -504,7 +506,7 @@ func (self *peer) getBlockHashes() bool {
self.bp.newSection([]*node{n}).activate(self)
} else {
plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
- self.bp.activateChain(parent.section, self, nil)
+ self.bp.activateChain(parent.section, self, self.switchC, nil)
}
} else {
plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
@@ -528,6 +530,7 @@ func (self *peer) run() {
self.lock.RLock()
switchC := self.switchC
+ plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
self.lock.RUnlock()
self.blockHashesRequestTimer = nil
@@ -570,7 +573,6 @@ LOOP:
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
// there is no persistence here, so GC will just take care of cleaning up
- break LOOP
// signal for peer switch, quit
case <-switchC:
@@ -593,9 +595,9 @@ LOOP:
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
- break LOOP
}
}
+
if !self.idle {
self.idle = true
self.bp.wg.Done()