diff options
Diffstat (limited to 'blockpool/blockpool.go')
-rw-r--r-- | blockpool/blockpool.go | 184 |
1 files changed, 104 insertions, 80 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index 3ed2e92c7..a60b6f43c 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -11,13 +11,11 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" "github.com/ethereum/go-ethereum/event" - ethlogger "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/pow" ) -var plog = ethlogger.NewLogger("Blockpool") - var ( // max number of block hashes sent in one request blockHashesBatchSize = 256 @@ -36,11 +34,13 @@ var ( // timeout interval: max time allowed for peer without sending a block blocksTimeout = 60 * time.Second // timeout interval: max time allowed for best peer to remain idle (not send new block after sync complete) - idleBestPeerTimeout = 120 * time.Second + idleBestPeerTimeout = 60 * time.Second // duration of suspension after peer fatal error during which peer is not allowed to reconnect peerSuspensionInterval = 300 * time.Second // status is logged every statusUpdateInterval statusUpdateInterval = 3 * time.Second + // + nodeCacheSize = 1000 ) // blockpool config, values default to constants @@ -49,6 +49,7 @@ type Config struct { BlockBatchSize int BlocksRequestRepetition int BlocksRequestMaxIdleRounds int + NodeCacheSize int BlockHashesRequestInterval time.Duration BlocksRequestInterval time.Duration BlockHashesTimeout time.Duration @@ -74,17 +75,19 @@ var errorToString = map[int]string{ ErrInvalidPoW: "Invalid PoW", // fatal ErrInsufficientChainInfo: "Insufficient chain info", // fatal ErrIdleTooLong: "Idle too long", // fatal - ErrIncorrectTD: "Incorrect Total Difficulty", // fatal + ErrIncorrectTD: "Incorrect Total Difficulty", // should be fatal, not now temporarily ErrUnrequestedBlock: "Unrequested block", } // error severity -func severity(code int) ethlogger.LogLevel { +func severity(code int) logger.LogLevel { switch code { + case ErrIncorrectTD: + return logger.WarnLevel case ErrUnrequestedBlock: - return ethlogger.WarnLevel + return logger.WarnLevel default: - return ethlogger.ErrorLevel + return logger.ErrorLevel } } @@ -120,6 +123,9 @@ func (self *Config) init() { if self.PeerSuspensionInterval == 0 { self.PeerSuspensionInterval = peerSuspensionInterval } + if self.NodeCacheSize == 0 { + self.NodeCacheSize = nodeCacheSize + } if self.StatusUpdateInterval == 0 { self.StatusUpdateInterval = statusUpdateInterval } @@ -171,6 +177,7 @@ type BlockPool struct { nodeCache map[common.Hash]*node nodeCacheLock sync.RWMutex + nodeCacheList []common.Hash // waitgroup is used in tests to wait for result-critical routines // as well as in determining idle / syncing status @@ -248,7 +255,7 @@ func (self *BlockPool) Start() { if (ev.Block.HeaderHash == common.Hash{}) { height = ev.Block.Header().Number } - plog.DebugDetailf("ChainHeadEvent: height: %v, td: %v, hash: %s", height, td, hex(ev.Block.Hash())) + glog.V(logger.Detail).Infof("ChainHeadEvent: height: %v, td: %v, hash: %s", height, td, hex(ev.Block.Hash())) self.setTD(td) self.peers.lock.Lock() @@ -262,11 +269,11 @@ func (self *BlockPool) Start() { self.peers.lock.Unlock() } case <-timer.C: - plog.DebugDetailf("status:\n%v", self.Status()) + glog.V(logger.Detail).Infof("status:\n%v", self.Status()) } } }() - glog.V(ethlogger.Info).Infoln("Blockpool started") + glog.V(logger.Info).Infoln("Blockpool started") } func (self *BlockPool) Stop() { @@ -279,7 +286,7 @@ func (self *BlockPool) Stop() { self.lock.Unlock() - plog.Infoln("Stopping...") + glog.V(logger.Info).Infoln("Stopping...") self.tdSub.Unsubscribe() close(self.quit) @@ -289,7 +296,7 @@ func (self *BlockPool) Stop() { self.pool = nil self.lock.Unlock() - plog.Infoln("Stopped") + glog.V(logger.Info).Infoln("Stopped") } // Wait blocks until active processes finish @@ -301,7 +308,7 @@ func (self *BlockPool) Wait(t time.Duration) { } self.lock.Unlock() - plog.Infoln("Waiting for processes to complete...") + glog.V(logger.Info).Infoln("Waiting for processes to complete...") w := make(chan bool) go func() { self.wg.Wait() @@ -310,9 +317,9 @@ func (self *BlockPool) Wait(t time.Duration) { select { case <-w: - plog.Infoln("Processes complete") + glog.V(logger.Info).Infoln("Processes complete") case <-time.After(t): - plog.Warnf("Timeout") + glog.V(logger.Warn).Infoln("Timeout") } } @@ -343,7 +350,7 @@ func (self *BlockPool) AddPeer( // RemovePeer needs to be called when the peer disconnects func (self *BlockPool) RemovePeer(peerId string) { - self.peers.removePeer(peerId) + self.peers.removePeer(peerId, true) } /* @@ -383,7 +390,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st hash, ok = next() bestpeer.lock.RLock() - plog.Debugf("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash)) + glog.V(logger.Debug).Infof("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash)) // first check if we are building the head section of a peer's chain if bestpeer.parentHash == hash { @@ -400,48 +407,45 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st */ headSection = true if entry := self.get(bestpeer.currentBlockHash); entry == nil { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash)) // if head block is not yet in the pool, create entry and start node list for section + self.nodeCacheLock.Lock() + n := self.findOrCreateNode(bestpeer.currentBlockHash, peerId) + n.block = bestpeer.currentBlock + n.blockBy = peerId + n.td = bestpeer.td + self.nodeCacheLock.Unlock() - node := &node{ - hash: bestpeer.currentBlockHash, - block: bestpeer.currentBlock, - hashBy: peerId, - blockBy: peerId, - td: bestpeer.td, - } // nodes is a list of nodes in one section ordered top-bottom (old to young) - nodes = append(nodes, node) - n++ + nodes = append(nodes, n) } else { // otherwise set child section iff found node is the root of a section // this is a possible scenario when a singleton head section was created // on an earlier occasion when this peer or another with the same block was best peer if entry.node == entry.section.bottom { child = entry.section - plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) } } } else { // otherwise : we are not building the head section of the peer - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash)) } // the switch channel signals peerswitch event - switchC := bestpeer.switchC bestpeer.lock.RUnlock() // iterate over hashes coming from peer (first round we have hash set above) LOOP: for ; ok; hash, ok = next() { - + n++ select { case <-self.quit: // global quit for blockpool return - case <-switchC: + case <-bestpeer.switchC: // if the peer is demoted, no more hashes read - plog.DebugDetailf("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash)) peerswitch = true break LOOP default: @@ -450,9 +454,9 @@ LOOP: // if we reach the blockchain we stop reading further blockhashes if self.hasBlock(hash) { // check if known block connecting the downloaded chain to our blockchain - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) if len(nodes) == 1 { - plog.DebugDetailf("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) // create new section if needed and push it to the blockchain sec = self.newSection(nodes) @@ -470,7 +474,7 @@ LOOP: and td together with blockBy are recorded on the node */ if len(nodes) == 0 && child != nil { - plog.DebugDetailf("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", sectionhex(child), peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", sectionhex(child), peerId, hex(bestpeer.currentBlockHash), hex(hash)) child.addSectionToBlockChain(bestpeer) } @@ -490,23 +494,21 @@ LOOP: response to hashes request. Note that by providing <from> we can link sections without having to wait for the root block of the child section to arrive, so it allows for superior performance. */ - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) // record the entry's chain section as child section child = entry.section continue LOOP } // otherwise record entry's chain section as parent connecting it to the pool - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) parent = entry.section break LOOP } // finally if node for block hash does not exist, create it and append node to section nodes - node := &node{ - hash: hash, - hashBy: peerId, - } - nodes = append(nodes, node) + self.nodeCacheLock.Lock() + nodes = append(nodes, self.findOrCreateNode(hash, peerId)) + self.nodeCacheLock.Unlock() } //for /* @@ -518,13 +520,13 @@ LOOP: */ self.chainLock.Lock() - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) /* Handle forks where connecting node is mid-section by splitting section at fork. No splitting needed if connecting node is head of a section. */ if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) self.splitSection(parent, entry) @@ -537,10 +539,7 @@ LOOP: sec = self.linkSections(nodes, parent, child) if sec != nil { - self.status.lock.Lock() - self.status.values.BlockHashes += len(nodes) - self.status.lock.Unlock() - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) } self.chainLock.Unlock() @@ -554,10 +553,8 @@ LOOP: In this case no activation should happen */ if parent != nil && !peerswitch { - bestpeer.lock.RLock() + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent)) self.activateChain(parent, bestpeer, bestpeer.switchC, nil) - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent)) - bestpeer.lock.RUnlock() } /* @@ -578,10 +575,10 @@ LOOP: Otherwise no way to check if it arrived. */ bestpeer.requestBlockHashes(sec.bottom.hash) - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) sec.activate(bestpeer) } else { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) sec.deactivate() } } @@ -589,7 +586,7 @@ LOOP: // If we are processing peer's head section, signal it to headSection process that it is created. if headSection { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) var headSec *section switch { @@ -601,7 +598,7 @@ LOOP: headSec = parent } if !peerswitch { - plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec)) + glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec)) bestpeer.headSectionC <- headSec } } @@ -635,6 +632,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { if sender == nil { return } + sender.lock.Lock() tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block) entry := self.get(hash) @@ -643,7 +641,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section. delayed B sends you block ... UNREQUESTED. Blocked if entry == nil { - plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + glog.V(logger.Detail).Infof("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) sender.addError(ErrUnrequestedBlock, "%x", hash) self.status.lock.Lock() @@ -656,28 +654,17 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { var bnode *node if entry == nil { self.nodeCacheLock.Lock() - bnode, _ = self.nodeCache[hash] - if bnode == nil { - bnode = &node{ - hash: currentBlockHash, - block: block, - hashBy: peerId, - blockBy: peerId, - td: tdFromCurrentHead, - } - self.nodeCache[hash] = bnode - } + bnode = self.findOrCreateNode(currentBlockHash, peerId) self.nodeCacheLock.Unlock() } else { bnode = entry.node } bnode.lock.Lock() - defer bnode.lock.Unlock() // check if block already received if bnode.block != nil { - plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy) + glog.V(logger.Detail).Infof("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy) // register peer on node as source if bnode.peers == nil { bnode.peers = make(map[string]bool) @@ -699,7 +686,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { creation is not blocking // validate block for PoW if !self.verifyPoW(block) { - plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) + glog.V(logger.Warn).Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) sender.addError(ErrInvalidPoW, "%x", hash) self.status.lock.Lock() @@ -711,13 +698,49 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { */ bnode.block = block bnode.blockBy = peerId + glog.V(logger.Detail).Infof("AddBlock: set td on node %s from peer <%s> (head: %s) to %v (was %v) ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.td, tdFromCurrentHead) bnode.td = tdFromCurrentHead self.status.lock.Lock() self.status.values.Blocks++ self.status.values.BlocksInPool++ self.status.lock.Unlock() } + bnode.lock.Unlock() + currentBlockC := sender.currentBlockC + switchC := sender.switchC + sender.lock.Unlock() + + // this must be called without peerlock. + // peerlock held can halt the loop and block on select forever + if tdFromCurrentHead != nil { + select { + case currentBlockC <- block: + case <-switchC: // peer is not best peer + } + } +} +func (self *BlockPool) findOrCreateNode(hash common.Hash, peerId string) (bnode *node) { + bnode, _ = self.nodeCache[hash] + if bnode == nil { + bnode = &node{ + hash: hash, + hashBy: peerId, + } + self.nodeCache[hash] = bnode + // purge oversize cache + if len(self.nodeCache) > self.Config.NodeCacheSize { + delete(self.nodeCache, self.nodeCacheList[0]) + self.nodeCacheList = append(self.nodeCacheList[1:], hash) + } else { + self.nodeCacheList = append(self.nodeCacheList, hash) + } + + self.status.lock.Lock() + self.status.values.BlockHashes++ + self.status.lock.Unlock() + } + return } /* @@ -731,8 +754,8 @@ func (self *BlockPool) activateChain(sec *section, p *peer, switchC chan bool, c LOOP: for sec != nil { - parent := self.getParent(sec) - plog.DebugDetailf("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id) + parent := sec.parent + glog.V(logger.Detail).Infof("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id) sec.activate(p) if i > 0 && connected != nil { connected[sec.top.hash] = sec @@ -745,11 +768,11 @@ LOOP: if sec.bottom.block != nil { if entry := self.get(sec.bottom.block.ParentHash()); entry != nil { parent = entry.section - plog.DebugDetailf("activateChain: [%s]-[%s] link", sectionhex(parent), sectionhex(sec)) + glog.V(logger.Detail).Infof("activateChain: [%s]-[%s] link", sectionhex(parent), sectionhex(sec)) link(parent, sec) } } else { - plog.DebugDetailf("activateChain: section [%s] activated by peer <%s> has missing root block", sectionhex(sec), p.id) + glog.V(logger.Detail).Infof("activateChain: section [%s] activated by peer <%s> has missing root block", sectionhex(sec), p.id) } } sec = parent @@ -769,17 +792,18 @@ LOOP: func (self *BlockPool) checkTD(nodes ...*node) { for _, n := range nodes { // skip check if queued future block + n.lock.RLock() if n.td != nil && !n.block.Queued() { - plog.DebugDetailf("peer td %v =?= block td %v", n.td, n.block.Td) - /* @zelig: Commented out temp untill the rest of the network has been fixed. + glog.V(logger.Detail).Infof("peer td %v =?= block td %v", n.td, n.block.Td) + // @zelig: Commented out temp untill the rest of the network has been fixed. if n.td.Cmp(n.block.Td) != 0 { - self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x", n.hash) + self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x peer td %v =?= block td %v", n.hash, n.td, n.block.Td) self.status.lock.Lock() self.status.badPeers[n.blockBy]++ self.status.lock.Unlock() } - */ } + n.lock.RUnlock() } } |