aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/blockpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/blockpool.go')
-rw-r--r--blockpool/blockpool.go176
1 files changed, 98 insertions, 78 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go
index b8cac4913..921d34949 100644
--- a/blockpool/blockpool.go
+++ b/blockpool/blockpool.go
@@ -38,10 +38,11 @@ var (
idleBestPeerTimeout = 120 * time.Second
// duration of suspension after peer fatal error during which peer is not allowed to reconnect
peerSuspensionInterval = 300 * time.Second
+ // status is logged every statusUpdateInterval
+ statusUpdateInterval = 3 * time.Second
)
-// config embedded in components, by default fall back to constants
-// by default all resolved to local
+// blockpool config, values default to constants
type Config struct {
BlockHashesBatchSize int
BlockBatchSize int
@@ -53,28 +54,40 @@ type Config struct {
BlocksTimeout time.Duration
IdleBestPeerTimeout time.Duration
PeerSuspensionInterval time.Duration
+ StatusUpdateInterval time.Duration
}
// blockpool errors
const (
ErrInvalidBlock = iota
ErrInvalidPoW
- ErrUnrequestedBlock
ErrInsufficientChainInfo
ErrIdleTooLong
ErrIncorrectTD
+ ErrUnrequestedBlock
)
+// error descriptions
var errorToString = map[int]string{
- ErrInvalidBlock: "Invalid block",
- ErrInvalidPoW: "Invalid PoW",
+ ErrInvalidBlock: "Invalid block", // fatal
+ ErrInvalidPoW: "Invalid PoW", // fatal
+ ErrInsufficientChainInfo: "Insufficient chain info", // fatal
+ ErrIdleTooLong: "Idle too long", // fatal
+ ErrIncorrectTD: "Incorrect Total Difficulty", // fatal
ErrUnrequestedBlock: "Unrequested block",
- ErrInsufficientChainInfo: "Insufficient chain info",
- ErrIdleTooLong: "Idle too long",
- ErrIncorrectTD: "Incorrect Total Difficulty",
}
-// init initialises all your laundry
+// error severity
+func severity(code int) ethlogger.LogLevel {
+ switch code {
+ case ErrUnrequestedBlock:
+ return ethlogger.WarnLevel
+ default:
+ return ethlogger.ErrorLevel
+ }
+}
+
+// init initialises the Config, zero values fall back to constants
func (self *Config) init() {
if self.BlockHashesBatchSize == 0 {
self.BlockHashesBatchSize = blockHashesBatchSize
@@ -106,6 +119,9 @@ func (self *Config) init() {
if self.PeerSuspensionInterval == 0 {
self.PeerSuspensionInterval = peerSuspensionInterval
}
+ if self.StatusUpdateInterval == 0 {
+ self.StatusUpdateInterval = statusUpdateInterval
+ }
}
// node is the basic unit of the internal model of block chain/tree in the blockpool
@@ -132,31 +148,35 @@ type entry struct {
type BlockPool struct {
Config *Config
- // the minimal interface with blockchain
- hasBlock func(hash common.Hash) bool
- insertChain func(types.Blocks) error
- verifyPoW func(pow.Block) bool
- chainEvents *event.TypeMux
+ // the minimal interface with blockchain manager
+ hasBlock func(hash common.Hash) bool // query if block is known
+ insertChain func(types.Blocks) error // add section to blockchain
+ verifyPoW func(pow.Block) bool // soft PoW verification
+ chainEvents *event.TypeMux // ethereum eventer for chainEvents
- tdSub event.Subscription
- td *big.Int
+ tdSub event.Subscription // subscription to core.ChainHeadEvent
+ td *big.Int // our own total difficulty
- pool map[string]*entry
- peers *peers
+ pool map[string]*entry // the actual blockpool
+ peers *peers // peers manager in peers.go
+
+ status *status // info about blockpool (UI interface) in status.go
lock sync.RWMutex
chainLock sync.RWMutex
// alloc-easy pool of hash slices
hashSlicePool chan []common.Hash
- status *status
-
- quit chan bool
- wg sync.WaitGroup
- running bool
+ // waitgroup is used in tests to wait for result-critical routines
+ // as well as in determining idle / syncing status
+ wg sync.WaitGroup //
+ quit chan bool // chan used for quitting parallel routines
+ running bool //
}
// public constructor
+// after blockpool returned, config can be set
+// BlockPool.Start will call Config.init to set missing values
func New(
hasBlock func(hash common.Hash) bool,
insertChain func(types.Blocks) error,
@@ -175,15 +195,6 @@ func New(
}
}
-func severity(code int) ethlogger.LogLevel {
- switch code {
- case ErrUnrequestedBlock:
- return ethlogger.WarnLevel
- default:
- return ethlogger.ErrorLevel
- }
-}
-
// allows restart
func (self *BlockPool) Start() {
self.lock.Lock()
@@ -193,7 +204,9 @@ func (self *BlockPool) Start() {
return
}
+ // set missing values
self.Config.init()
+
self.hashSlicePool = make(chan []common.Hash, 150)
self.status = newStatus()
self.quit = make(chan bool)
@@ -212,8 +225,11 @@ func (self *BlockPool) Start() {
bp: self,
}
+ // subscribe and listen to core.ChainHeadEvent{} for uptodate TD
self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{})
- timer := time.NewTicker(3 * time.Second)
+
+ // status update interval
+ timer := time.NewTicker(self.Config.StatusUpdateInterval)
go func() {
for {
select {
@@ -292,9 +308,14 @@ func (self *BlockPool) Wait(t time.Duration) {
/*
AddPeer is called by the eth protocol instance running on the peer after
the status message has been received with total difficulty and current block hash
-Called a second time with the same peer id, it is used to update chain info for a peer. This is used when a new (mined) block message is received.
+
+Called a second time with the same peer id, it is used to update chain info for a peer.
+This is used when a new (mined) block message is received.
+
RemovePeer needs to be called when the peer disconnects.
-Peer info is currently not persisted across disconnects (or sessions)
+
+Peer info is currently not persisted across disconnects (or sessions) except for suspension
+
*/
func (self *BlockPool) AddPeer(
@@ -319,12 +340,12 @@ AddBlockHashes
Entry point for eth protocol to add block hashes received via BlockHashesMsg
-only hashes from the best peer are handled
+Only hashes from the best peer are handled
-initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer)
-launches all block request processes on each chain section
+Initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer)
+Launches all block request processes on each chain section
-the first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
+The first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
*/
func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) {
@@ -335,7 +356,6 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st
// bestpeer is still the best peer
self.wg.Add(1)
-
defer func() { self.wg.Done() }()
self.status.lock.Lock()
@@ -360,11 +380,11 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st
return
}
/*
- when peer is promoted in switchPeer, a new header section process is launched
- as the head section skeleton is actually created here, it is signaled to the process
- so that it can quit
- in the special case that the node for parent of the head block is found in the blockpool
- (with or without fetched block)
+ When peer is promoted in switchPeer, a new header section process is launched.
+ Once the head section skeleton is actually created here, it is signaled to the process
+ so that it can quit.
+ In the special case that the node for parent of the head block is found in the blockpool
+ (with or without fetched block), a singleton section containing only the head block node is created.
*/
headSection = true
if entry := self.get(bestpeer.currentBlockHash); entry == nil {
@@ -383,7 +403,7 @@ func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId st
} else {
// otherwise set child section iff found node is the root of a section
// this is a possible scenario when a singleton head section was created
- // on an earlier occasion this peer or another with the same block was best peer
+ // on an earlier occasion when this peer or another with the same block was best peer
if entry.node == entry.section.bottom {
child = entry.section
plog.DebugDetailf("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash))
@@ -414,7 +434,7 @@ LOOP:
default:
}
- // if we reach the blockchain we stop reading more
+ // if we reach the blockchain we stop reading further blockhashes
if self.hasBlock(hash) {
// check if known block connecting the downloaded chain to our blockchain
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
@@ -451,10 +471,11 @@ LOOP:
// reached a known chain in the pool
if entry.node == entry.section.bottom && n == 1 {
/*
- the first block hash received is an orphan in the pool
- this also supports clients that (despite the spec) include <from> hash in their
+ The first block hash received is an orphan node in the pool
+
+ This also supports clients that (despite the spec) include <from> hash in their
response to hashes request. Note that by providing <from> we can link sections
- without having to wait for the root block of the child section to arrive, so it allows for superior performance
+ without having to wait for the root block of the child section to arrive, so it allows for superior performance.
*/
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section))
// record the entry's chain section as child section
@@ -486,9 +507,8 @@ LOOP:
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes))
/*
- handle forks where connecting node is mid-section
- by splitting section at fork
- no splitting needed if connecting node is head of a section
+ Handle forks where connecting node is mid-section by splitting section at fork.
+ No splitting needed if connecting node is head of a section.
*/
if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash))
@@ -500,10 +520,7 @@ LOOP:
self.status.lock.Unlock()
}
- /*
- if new section is created, link it to parent/child sections
- and launch section process fetching blocks and further hashes
- */
+ // If new section is created, link it to parent/child sections.
sec = self.linkSections(nodes, parent, child)
if sec != nil {
@@ -516,11 +533,12 @@ LOOP:
self.chainLock.Unlock()
/*
- if a blockpool node is reached (parent section is not nil),
+ If a blockpool node is reached (parent section is not nil),
activate section (unless our peer is demoted by now).
- this can be the bottom half of a newly split section in case of a fork.
+ This can be the bottom half of a newly split section in case of a fork.
+
bestPeer is nil if we got here after our peer got demoted while processing.
- in this case no activation should happen
+ In this case no activation should happen
*/
if parent != nil && !peerswitch {
self.activateChain(parent, bestpeer, nil)
@@ -528,9 +546,8 @@ LOOP:
}
/*
- if a new section was created,
- register section iff head section or no child known
- activate it with this peer
+ If a new section was created, register section iff head section or no child known
+ Activate it with this peer.
*/
if sec != nil {
// switch on section process (it is paused by switchC)
@@ -541,9 +558,9 @@ LOOP:
bestpeer.lock.Unlock()
}
/*
- request next block hashes for parent section here.
- but only once, repeating only when bottom block arrives,
- otherwise no way to check if it arrived
+ Request another batch of older block hashes for parent section here.
+ But only once, repeating only when the section's root block arrives.
+ Otherwise no way to check if it arrived.
*/
bestpeer.requestBlockHashes(sec.bottom.hash)
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
@@ -554,7 +571,7 @@ LOOP:
}
}
- // if we are processing peer's head section, signal it to headSection process that it is created
+ // If we are processing peer's head section, signal it to headSection process that it is created.
if headSection {
plog.DebugDetailf("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash))
@@ -578,11 +595,13 @@ LOOP:
/*
AddBlock is the entry point for the eth protocol to call when blockMsg is received.
- It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error
+ It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error.
At the same time it is opportunistic in that if a requested block may be provided by any peer.
The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit.
+
+ If the block received is the head block of the current best peer, signal it to the head section process
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash()
@@ -611,6 +630,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
self.status.lock.Unlock()
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(sender.currentBlockHash))
+ // signal to head section process
sender.currentBlockC <- block
}
} else {
@@ -629,7 +649,6 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
sender.lock.Unlock()
if entry == nil {
- // penalise peer for sending what we have not asked
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)
@@ -647,7 +666,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
node.lock.Lock()
defer node.lock.Unlock()
- // check if block already present
+ // check if block already received
if node.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
return
@@ -683,9 +702,9 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
}
/*
- iterates down a chain section by section
- activating section process on incomplete sections with peer
- relinking orphaned sections with their parent if root block (and its parent hash) is known)
+ activateChain iterates down a chain section by section.
+ It activates the section process on incomplete sections with peer.
+ It relinks orphaned sections with their parent if root block (and its parent hash) is known.
*/
func (self *BlockPool) activateChain(sec *section, p *peer, connected map[string]*section) {
@@ -704,8 +723,8 @@ LOOP:
connected[sec.top.hash.Str()] = sec
}
/*
- we need to relink both complete and incomplete sections
- the latter could have been blockHashesRequestsComplete before being delinked from its parent
+ Need to relink both complete and incomplete sections
+ An incomplete section could have been blockHashesRequestsComplete before being delinked from its parent.
*/
if parent == nil {
if sec.bottom.block != nil {
@@ -720,7 +739,7 @@ LOOP:
}
sec = parent
- // stop if peer got demoted
+ // stop if peer got demoted or global quit
select {
case <-switchC:
break LOOP
@@ -731,6 +750,7 @@ LOOP:
}
}
+// check if block's actual TD (calculated after successful insertChain) is identical to TD advertised for peer's head block.
func (self *BlockPool) checkTD(nodes ...*node) {
for _, n := range nodes {
if n.td != nil {
@@ -742,7 +762,7 @@ func (self *BlockPool) checkTD(nodes ...*node) {
}
}
-// must run in separate go routine, otherwise
+// requestBlocks must run in separate go routine, otherwise
// switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock
func (self *BlockPool) requestBlocks(attempts int, hashes []common.Hash) {
self.wg.Add(1)
@@ -806,6 +826,7 @@ func (self *BlockPool) remove(sec *section) {
}
}
+// get/put for optimised allocation similar to sync.Pool
func (self *BlockPool) getHashSlice() (s []common.Hash) {
select {
case s = <-self.hashSlicePool:
@@ -815,7 +836,6 @@ func (self *BlockPool) getHashSlice() (s []common.Hash) {
return
}
-// Return returns a Client to the pool.
func (self *BlockPool) putHashSlice(s []common.Hash) {
if len(s) == self.Config.BlockBatchSize {
select {