diff options
Diffstat (limited to 'blockpool/blockpool.go')
-rw-r--r-- | blockpool/blockpool.go | 176 |
1 files changed, 98 insertions, 78 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index b8cac4913..921d34949 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -38,10 +38,11 @@ var ( idleBestPeerTimeout = 120 * time.Second // duration of suspension after peer fatal error during which peer is not allowed to reconnect peerSuspensionInterval = 300 * time.Second + // status is logged every statusUpdateInterval + statusUpdateInterval = 3 * time.Second ) -// config embedded in components, by default fall back to constants -// by default all resolved to local +// blockpool config, values default to constants type Config struct { BlockHashesBatchSize int BlockBatchSize int @@ -53,28 +54,40 @@ type Config struct { BlocksTimeout time.Duration IdleBestPeerTimeout time.Duration PeerSuspensionInterval time.Duration + StatusUpdateInterval time.Duration } // blockpool errors const ( ErrInvalidBlock = iota ErrInvalidPoW - ErrUnrequestedBlock ErrInsufficientChainInfo ErrIdleTooLong ErrIncorrectTD + ErrUnrequestedBlock ) +// error descriptions var errorToString = map[int]string{ - ErrInvalidBlock: "Invalid block", - ErrInvalidPoW: "Invalid PoW", + ErrInvalidBlock: "Invalid block", // fatal + ErrInvalidPoW: "Invalid PoW", // fatal + ErrInsufficientChainInfo: "Insufficient chain info", // fatal + ErrIdleTooLong: "Idle too long", // fatal + ErrIncorrectTD: "Incorrect Total Difficulty", // fatal ErrUnrequestedBlock: "Unrequested block", - ErrInsufficientChainInfo: "Insufficient chain info", - ErrIdleTooLong: "Idle too long", - ErrIncorrectTD: "Incorrect Total Difficulty", } -// init initialises all your laundry +// error severity +func severity(code int) ethlogger.LogLevel { + switch code { + case ErrUnrequestedBlock: + return ethlogger.WarnLevel + default: + return ethlogger.ErrorLevel + } +} + +// init initialises the Config, zero values fall back to constants func (self *Config) init() { if self.BlockHashesBatchSize == 0 { self.BlockHashesBatchSize = blockHashesBatchSize @@ -106,6 +119,9 @@ func (self *Config) init() { if self.PeerSuspensionInterval == 0 { self.PeerSuspensionInterval = peerSuspensionInterval } + if self.StatusUpdateInterval == 0 { + self.StatusUpdateInterval = statusUpdateInterval + } } // node is the basic unit of the internal model of block chain/tree in the blockpool @@ -132,31 +148,35 @@ type entry struct { type BlockPool struct { Config *Config - // the minimal interface with blockchain - hasBlock func(hash common.Hash) bool - insertChain func(types.Blocks) error - verifyPoW func(pow.Block) bool - chainEvents *event.TypeMux + // the minimal interface with blockchain manager + hasBlock func(hash common.Hash) bool // query if block is known + insertChain func(types.Blocks) error // add section to blockchain + verifyPoW func(pow.Block) bool // soft PoW verification + chainEvents *event.TypeMux // ethereum eventer for chainEvents - tdSub event.Subscription - td *big.Int + tdSub event.Subscription // subscription to core.ChainHeadEvent + td *big.Int // our own total difficulty - pool map[string]*entry - peers *peers + pool map[string]*entry // the actual blockpool + peers *peers // peers manager in peers.go + + status *status // info about blockpool (UI interface) in status.go lock sync.RWMutex chainLock sync.RWMutex // alloc-easy pool of hash slices hashSlicePool chan []common.Hash - status *status - - quit chan bool - wg sync.WaitGroup - running bool + // waitgroup is used in tests to wait for result-critical routines + // as well as in determining idle / syncing status + wg sync.WaitGroup // + quit chan bool // chan used for quitting parallel routines + running bool // } // public constructor +// after blockpool returned, config can be set +// BlockPool.Start will call Config.init to set missing values func New( hasBlock func(hash common.Hash) bool, insertChain func(types.Blocks) error, @@ -175,15 +195,6 @@ func New( } } -func severity(code int) ethlogger.LogLevel { - switch code { - case ErrUnrequestedBlock: - return ethlogger.WarnLevel - default: - return ethlogger.ErrorLevel - } -} - // allows restart func (self *BlockPool) Start() { self.lock.Lock() @@ -193,7 +204,9 @@ func (self *BlockPool) Start() { return } + // set missing values self.Config.init() + self.hashSlicePool = make(chan []common.Hash, 150) self.status = newStatus() self.quit = make(chan bool) @@ -212,8 +225,11 @@ func (self *BlockPool) Start() { bp: self, } + // subscribe and listen to core.ChainHeadEvent{} for uptodate TD self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{}) - timer := time.NewTicker(3 * time.Second) + + // status update interval + timer := time.NewTicker(self.Config.StatusUpdateInterval) go func() { for { select { @@ -292,9 +308,14 @@ func (self *BlockPool) Wait(t time.Duration) { /* AddPeer is called by the eth protocol instance running on the peer after the status message has been received with total difficulty and current block hash -Called a second time with the same peer id, it is used to update chain info for a peer. This is used when a new (mined) block message is received. + +Called a second time with the same peer id, it is used to update chain info for a peer. +This is used when a new (mined) block message is received. + RemovePeer needs to be called when the peer disconnects. -Peer info is currently not persisted across disconnects (or sessions) + +Peer info is currently not persisted across disconnects (or sessions) except for suspension + */ func (self *BlockPool) AddPeer( @@ -319,12 +340,12 @@ AddBlockHashes Entry point for eth protocol to add block hashes received via BlockHashesMsg -only hashes from the best peer are handled +Only hashes from the best peer are handled -initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer) -launches all block request processes on each chain section +Initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer) +Launches all block request processes on each chain section -the first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns. +The first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns. */ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) { @@ -335,7 +356,6 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st // bestpeer is still the best peer self.wg.Add(1) - defer func() { self.wg.Done() }() self.status.lock.Lock() @@ -360,11 +380,11 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st return } /* - when peer is promoted in switchPeer, a new header section process is launched - as the head section skeleton is actually created here, it is signaled to the process - so that it can quit - in the special case that the node for parent of the head block is found in the blockpool - (with or without fetched block) + When peer is promoted in switchPeer, a new header section process is launched. + Once the head section skeleton is actually created here, it is signaled to the process + so that it can quit. + In the special case that the node for parent of the head block is found in the blockpool + (with or without fetched block), a singleton section containing only the head block node is created. */ headSection = true if entry := self.get(bestpeer.currentBlockHash); entry == nil { @@ -383,7 +403,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st } else { // otherwise set child section iff found node is the root of a section // this is a possible scenario when a singleton head section was created - // on an earlier occasion this peer or another with the same block was best peer + // on an earlier occasion when this peer or another with the same block was best peer if entry.node == entry.section.bottom { child = entry.section plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash)) @@ -414,7 +434,7 @@ LOOP: default: } - // if we reach the blockchain we stop reading more + // if we reach the blockchain we stop reading further blockhashes if self.hasBlock(hash) { // check if known block connecting the downloaded chain to our blockchain plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash)) @@ -451,10 +471,11 @@ LOOP: // reached a known chain in the pool if entry.node == entry.section.bottom && n == 1 { /* - the first block hash received is an orphan in the pool - this also supports clients that (despite the spec) include <from> hash in their + The first block hash received is an orphan node in the pool + + This also supports clients that (despite the spec) include <from> hash in their response to hashes request. Note that by providing <from> we can link sections - without having to wait for the root block of the child section to arrive, so it allows for superior performance + without having to wait for the root block of the child section to arrive, so it allows for superior performance. */ plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section)) // record the entry's chain section as child section @@ -486,9 +507,8 @@ LOOP: plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes)) /* - handle forks where connecting node is mid-section - by splitting section at fork - no splitting needed if connecting node is head of a section + Handle forks where connecting node is mid-section by splitting section at fork. + No splitting needed if connecting node is head of a section. */ if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 { plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash)) @@ -500,10 +520,7 @@ LOOP: self.status.lock.Unlock() } - /* - if new section is created, link it to parent/child sections - and launch section process fetching blocks and further hashes - */ + // If new section is created, link it to parent/child sections. sec = self.linkSections(nodes, parent, child) if sec != nil { @@ -516,11 +533,12 @@ LOOP: self.chainLock.Unlock() /* - if a blockpool node is reached (parent section is not nil), + If a blockpool node is reached (parent section is not nil), activate section (unless our peer is demoted by now). - this can be the bottom half of a newly split section in case of a fork. + This can be the bottom half of a newly split section in case of a fork. + bestPeer is nil if we got here after our peer got demoted while processing. - in this case no activation should happen + In this case no activation should happen */ if parent != nil && !peerswitch { self.activateChain(parent, bestpeer, nil) @@ -528,9 +546,8 @@ LOOP: } /* - if a new section was created, - register section iff head section or no child known - activate it with this peer + If a new section was created, register section iff head section or no child known + Activate it with this peer. */ if sec != nil { // switch on section process (it is paused by switchC) @@ -541,9 +558,9 @@ LOOP: bestpeer.lock.Unlock() } /* - request next block hashes for parent section here. - but only once, repeating only when bottom block arrives, - otherwise no way to check if it arrived + Request another batch of older block hashes for parent section here. + But only once, repeating only when the section's root block arrives. + Otherwise no way to check if it arrived. */ bestpeer.requestBlockHashes(sec.bottom.hash) plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec)) @@ -554,7 +571,7 @@ LOOP: } } - // if we are processing peer's head section, signal it to headSection process that it is created + // If we are processing peer's head section, signal it to headSection process that it is created. if headSection { plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash)) @@ -578,11 +595,13 @@ LOOP: /* AddBlock is the entry point for the eth protocol to call when blockMsg is received. - It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error + It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error. At the same time it is opportunistic in that if a requested block may be provided by any peer. The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit. + + If the block received is the head block of the current best peer, signal it to the head section process */ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { hash := block.Hash() @@ -611,6 +630,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { self.status.lock.Unlock() } else { plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash)) + // signal to head section process sender.currentBlockC <- block } } else { @@ -629,7 +649,6 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { sender.lock.Unlock() if entry == nil { - // penalise peer for sending what we have not asked plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash)) sender.addError(ErrUnrequestedBlock, "%x", hash) @@ -647,7 +666,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { node.lock.Lock() defer node.lock.Unlock() - // check if block already present + // check if block already received if node.block != nil { plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy) return @@ -683,9 +702,9 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) { } /* - iterates down a chain section by section - activating section process on incomplete sections with peer - relinking orphaned sections with their parent if root block (and its parent hash) is known) + activateChain iterates down a chain section by section. + It activates the section process on incomplete sections with peer. + It relinks orphaned sections with their parent if root block (and its parent hash) is known. */ func (self *BlockPool) activateChain(sec *section, p *peer, connected map[string]*section) { @@ -704,8 +723,8 @@ LOOP: connected[sec.top.hash.Str()] = sec } /* - we need to relink both complete and incomplete sections - the latter could have been blockHashesRequestsComplete before being delinked from its parent + Need to relink both complete and incomplete sections + An incomplete section could have been blockHashesRequestsComplete before being delinked from its parent. */ if parent == nil { if sec.bottom.block != nil { @@ -720,7 +739,7 @@ LOOP: } sec = parent - // stop if peer got demoted + // stop if peer got demoted or global quit select { case <-switchC: break LOOP @@ -731,6 +750,7 @@ LOOP: } } +// check if block's actual TD (calculated after successful insertChain) is identical to TD advertised for peer's head block. func (self *BlockPool) checkTD(nodes ...*node) { for _, n := range nodes { if n.td != nil { @@ -742,7 +762,7 @@ func (self *BlockPool) checkTD(nodes ...*node) { } } -// must run in separate go routine, otherwise +// requestBlocks must run in separate go routine, otherwise // switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock func (self *BlockPool) requestBlocks(attempts int, hashes []common.Hash) { self.wg.Add(1) @@ -806,6 +826,7 @@ func (self *BlockPool) remove(sec *section) { } } +// get/put for optimised allocation similar to sync.Pool func (self *BlockPool) getHashSlice() (s []common.Hash) { select { case s = <-self.hashSlicePool: @@ -815,7 +836,6 @@ func (self *BlockPool) getHashSlice() (s []common.Hash) { return } -// Return returns a Client to the pool. func (self *BlockPool) putHashSlice(s []common.Hash) { if len(s) == self.Config.BlockBatchSize { select { |