aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/section.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/section.go')
-rw-r--r--blockpool/section.go677
1 files changed, 677 insertions, 0 deletions
diff --git a/blockpool/section.go b/blockpool/section.go
new file mode 100644
index 000000000..48ea15d31
--- /dev/null
+++ b/blockpool/section.go
@@ -0,0 +1,677 @@
+package blockpool
+
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+)
+
+/*
+ section is the worker on each chain section in the block pool
+ - remove the section if there are blocks missing after an absolute time
+ - remove the section if there are maxIdleRounds of idle rounds of block requests with no response
+ - periodically polls the chain section for missing blocks which are then requested from peers
+ - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they removed (inserted in blockchain, invalid or expired)
+ - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
+ - when turned back on it recursively calls itself on the root of the next chain section
+*/
+type section struct {
+ lock sync.RWMutex
+
+ parent *section // connecting section back in time towards blockchain
+ child *section // connecting section forward in time
+
+ top *node // the topmost node = head node = youngest node within the chain section
+ bottom *node // the bottom node = root node = oldest node within the chain section
+ nodes []*node
+
+ peer *peer
+ parentHash []byte
+
+ blockHashes [][]byte
+
+ poolRootIndex int
+
+ bp *BlockPool
+
+ controlC chan *peer // to (de)register the current best peer
+ poolRootC chan *peer // indicate connectedness to blockchain (well, known blocks)
+ offC chan bool // closed if process terminated
+ suicideC chan bool // initiate suicide on the section
+ quitInitC chan bool // to signal end of initialisation
+ forkC chan chan bool // freeze section process while splitting
+ switchC chan bool // switching
+ idleC chan bool // channel to indicate thai food
+ processC chan *node //
+ missingC chan *node //
+
+ blocksRequestTimer <-chan time.Time
+ blockHashesRequestTimer <-chan time.Time
+ suicideTimer <-chan time.Time
+
+ blocksRequests int
+ blockHashesRequests int
+
+ blocksRequestsComplete bool
+ blockHashesRequestsComplete bool
+ ready bool
+ same bool
+ initialised bool
+ active bool
+
+ step int
+ idle int
+ missing int
+ lastMissing int
+ depth int
+ invalid bool
+ poolRoot bool
+}
+
+//
+func (self *BlockPool) newSection(nodes []*node) *section {
+ sec := &section{
+ bottom: nodes[len(nodes)-1],
+ top: nodes[0],
+ nodes: nodes,
+ poolRootIndex: len(nodes),
+ bp: self,
+ controlC: make(chan *peer),
+ poolRootC: make(chan *peer),
+ offC: make(chan bool),
+ }
+
+ for i, node := range nodes {
+ entry := &entry{node: node, section: sec, index: &index{i}}
+ self.set(node.hash, entry)
+ }
+
+ plog.DebugDetailf("[%s] setup section process", sectionhex(sec))
+
+ go sec.run()
+ return sec
+}
+
+func (self *section) addSectionToBlockChain(p *peer) {
+ self.bp.wg.Add(1)
+ go func() {
+
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ defer func() {
+ self.bp.wg.Done()
+ }()
+
+ var node *node
+ var keys []string
+ var blocks []*types.Block
+ for self.poolRootIndex > 0 {
+ node = self.nodes[self.poolRootIndex-1]
+ node.lock.RLock()
+ block := node.block
+ node.lock.RUnlock()
+ if block == nil {
+ break
+ }
+ self.poolRootIndex--
+ keys = append(keys, string(node.hash))
+ blocks = append(blocks, block)
+ }
+
+ if len(blocks) == 0 {
+ return
+ }
+
+ self.bp.lock.Lock()
+ for _, key := range keys {
+ delete(self.bp.pool, key)
+ }
+ self.bp.lock.Unlock()
+
+ plog.Infof("[%s] insert %v blocks [%v/%v] into blockchain", sectionhex(self), len(blocks), hex(blocks[0].Hash()), hex(blocks[len(blocks)-1].Hash()))
+ err := self.bp.insertChain(blocks)
+ if err != nil {
+ self.invalid = true
+ self.bp.peers.peerError(node.blockBy, ErrInvalidBlock, "%v", err)
+ plog.Warnf("invalid block %x", node.hash)
+ plog.Warnf("penalise peers %v (hash), %v (block)", node.hashBy, node.blockBy)
+
+ // or invalid block and the entire chain needs to be removed
+ self.removeInvalidChain()
+ } else {
+ // if all blocks inserted in this section
+ // then need to try to insert blocks in child section
+ if self.poolRootIndex == 0 {
+ // if there is a child section, then recursively call itself:
+ // also if section process is not terminated,
+ // then signal blockchain connectivity with poolRootC
+ if child := self.bp.getChild(self); child != nil {
+ select {
+ case <-child.offC:
+ plog.DebugDetailf("[%s] add complete child section [%s] to the blockchain", sectionhex(self), sectionhex(child))
+ case child.poolRootC <- p:
+ plog.DebugDetailf("[%s] add incomplete child section [%s] to the blockchain", sectionhex(self), sectionhex(child))
+ }
+ child.addSectionToBlockChain(p)
+ } else {
+ plog.DebugDetailf("[%s] no child section in pool", sectionhex(self))
+ }
+ plog.DebugDetailf("[%s] section completely inserted to blockchain - remove", sectionhex(self))
+ // complete sections are removed. if called from within section process,
+ // this must run in its own go routine to avoid deadlock
+ self.remove()
+ }
+ }
+
+ self.bp.status.lock.Lock()
+ if err == nil {
+ headKey := string(blocks[0].ParentHash())
+ height := self.bp.status.chain[headKey] + len(blocks)
+ self.bp.status.chain[string(blocks[len(blocks)-1].Hash())] = height
+ if height > self.bp.status.values.LongestChain {
+ self.bp.status.values.LongestChain = height
+ }
+ delete(self.bp.status.chain, headKey)
+ }
+ self.bp.status.values.BlocksInChain += len(blocks)
+ self.bp.status.values.BlocksInPool -= len(blocks)
+ if err != nil {
+ self.bp.status.badPeers[node.blockBy]++
+ }
+ self.bp.status.lock.Unlock()
+
+ }()
+
+}
+
+func (self *section) run() {
+
+ // absolute time after which sub-chain is killed if not complete (some blocks are missing)
+ self.suicideC = make(chan bool)
+ self.forkC = make(chan chan bool)
+ self.suicideTimer = time.After(self.bp.Config.BlocksTimeout)
+
+ // node channels for the section
+ // container for missing block hashes
+ var checking bool
+ var ping = time.NewTicker(5 * time.Second)
+
+LOOP:
+ for !self.blockHashesRequestsComplete || !self.blocksRequestsComplete {
+
+ select {
+ case <-ping.C:
+ var name = "no peer"
+ if self.peer != nil {
+ name = self.peer.id
+ }
+ plog.DebugDetailf("[%s] peer <%s> active: %v", sectionhex(self), name, self.active)
+
+ // global quit from blockpool
+ case <-self.bp.quit:
+ break LOOP
+
+ // pause for peer switching
+ case <-self.switchC:
+ self.switchC = nil
+
+ case p := <-self.poolRootC:
+ // signal on pool root channel indicates that the blockpool is
+ // connected to the blockchain, insert the longest chain of blocks
+ // ignored in idle mode to avoid inserting chain sections of non-live peers
+ self.poolRoot = true
+ // switch off hash requests in case they were on
+ self.blockHashesRequestTimer = nil
+ self.blockHashesRequestsComplete = true
+ self.switchOn(p)
+
+ // peer quit or demoted, put section in idle mode
+ case <-self.idleC:
+ // peer quit or demoted, put section in idle mode
+ plog.Debugf("[%s] peer <%s> quit or demoted", sectionhex(self), self.peer.id)
+ self.switchOff()
+ self.idleC = nil
+
+ // timebomb - if section is not complete in time, nuke the entire chain
+ case <-self.suicideTimer:
+ self.suicide()
+ plog.Debugf("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
+ self.suicideTimer = nil
+
+ // closing suicideC triggers section suicide: removes section nodes from pool and terminates section process
+ case <-self.suicideC:
+ plog.DebugDetailf("[%s] suicide", sectionhex(self))
+ self.unlink()
+ self.bp.remove(self)
+ plog.DebugDetailf("[%s] done", sectionhex(self))
+ break LOOP
+
+ // alarm for checking blocks in the section
+ case <-self.blocksRequestTimer:
+ plog.DebugDetailf("[%s] alarm: block request time", sectionhex(self))
+ self.processC = self.missingC
+
+ // alarm for checking parent of the section or sending out hash requests
+ case <-self.blockHashesRequestTimer:
+ plog.DebugDetailf("[%s] alarm: hash request time", sectionhex(self))
+ self.blockHashesRequest()
+
+ // activate this section process with a peer
+ case p := <-self.controlC:
+ if p == nil {
+ self.switchOff()
+ } else {
+ self.switchOn(p)
+ }
+ self.bp.wg.Done()
+ // blocks the process until section is split at the fork
+ case waiter := <-self.forkC:
+ <-waiter
+ self.initialised = false
+ self.quitInitC = nil
+
+ //
+ case n, ok := <-self.processC:
+ // channel closed, first iteration finished
+ if !ok && !self.initialised {
+ plog.DebugDetailf("[%s] section initalised: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth)
+ self.initialised = true
+ self.processC = nil
+ // self.processC = make(chan *node, self.missing)
+ self.checkRound()
+ checking = false
+ break
+ }
+ plog.DebugDetailf("[%s] section proc step %v: missing %v/%v/%v", sectionhex(self), self.step, self.missing, self.lastMissing, self.depth)
+ if !checking {
+ self.step = 0
+ self.missing = 0
+ checking = true
+ }
+ self.step++
+
+ n.lock.RLock()
+ block := n.block
+ n.lock.RUnlock()
+
+ // if node has no block, request it (buffer it for batch request)
+ // feed it to missingC channel for the next round
+ if block == nil {
+ pos := self.missing % self.bp.Config.BlockBatchSize
+ if pos == 0 {
+ if self.missing != 0 {
+ self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:])
+ }
+ self.blockHashes = self.bp.getHashSlice()
+ }
+ self.blockHashes[pos] = n.hash
+ self.missing++
+ self.missingC <- n
+ } else {
+ // checking for parent block
+ if self.poolRoot {
+ // if node has got block (received via async AddBlock call from protocol)
+ if self.step == self.lastMissing {
+ // current root of the pool
+ plog.DebugDetailf("[%s] received block for current pool root %s", sectionhex(self), hex(n.hash))
+ self.addSectionToBlockChain(self.peer)
+ }
+ } else {
+ if self.parentHash == nil && n == self.bottom {
+ self.parentHash = block.ParentHash()
+ plog.DebugDetailf("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash))
+ self.blockHashesRequest()
+ }
+ }
+ }
+ if self.initialised && self.step == self.lastMissing {
+ plog.DebugDetailf("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
+ self.checkRound()
+ checking = false
+ }
+ } // select
+ } // for
+
+ close(self.offC)
+ if self.peer != nil {
+ self.active = false
+ self.bp.wg.Done()
+ }
+
+ plog.DebugDetailf("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).", sectionhex(self), self.depth, self.blocksRequests, self.blockHashesRequests)
+
+}
+
+func (self *section) switchOn(newpeer *peer) {
+
+ oldpeer := self.peer
+ // reset switchC/switchC to current best peer
+ self.idleC = newpeer.idleC
+ self.switchC = newpeer.switchC
+ self.peer = newpeer
+
+ if oldpeer != newpeer {
+ oldp := "no peer"
+ newp := "no peer"
+ if oldpeer != nil {
+ oldp = oldpeer.id
+ }
+ if newpeer != nil {
+ newp = newpeer.id
+ }
+
+ plog.DebugDetailf("[%s] active mode <%s> -> <%s>", sectionhex(self), oldp, newp)
+ }
+
+ // activate section with current peer
+ if oldpeer == nil {
+ self.bp.wg.Add(1)
+ self.active = true
+
+ if !self.blockHashesRequestsComplete {
+ self.blockHashesRequestTimer = time.After(0)
+ }
+ if !self.blocksRequestsComplete {
+ if !self.initialised {
+ if self.quitInitC != nil {
+ <-self.quitInitC
+ }
+ self.missingC = make(chan *node, self.bp.Config.BlockHashesBatchSize)
+ self.processC = make(chan *node, self.bp.Config.BlockHashesBatchSize)
+ self.quitInitC = make(chan bool)
+
+ self.step = 0
+ self.missing = 0
+ self.depth = len(self.nodes)
+ self.lastMissing = self.depth
+
+ self.feedNodes()
+ } else {
+ self.blocksRequestTimer = time.After(0)
+ }
+ }
+ }
+}
+
+// put the section to idle mode
+func (self *section) switchOff() {
+ // active -> idle
+ if self.peer != nil {
+ oldp := "no peer"
+ oldpeer := self.peer
+ if oldpeer != nil {
+ oldp = oldpeer.id
+ }
+ plog.DebugDetailf("[%s] idle mode peer <%s> -> <> (%v total attempts): missing %v/%v/%v", sectionhex(self), oldp, self.blocksRequests, self.missing, self.lastMissing, self.depth)
+
+ self.active = false
+ self.peer = nil
+ // turn off timers
+ self.blocksRequestTimer = nil
+ self.blockHashesRequestTimer = nil
+
+ if self.quitInitC != nil {
+ <-self.quitInitC
+ self.quitInitC = nil
+ }
+ self.processC = nil
+ self.bp.wg.Done()
+ }
+}
+
+// iterates through nodes of a section to feed processC
+// used to initialise chain section
+func (self *section) feedNodes() {
+ // if not run at least once fully, launch iterator
+ self.bp.wg.Add(1)
+ go func() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ defer func() {
+ self.bp.wg.Done()
+ }()
+ var n *node
+ INIT:
+ for _, n = range self.nodes {
+ select {
+ case self.processC <- n:
+ case <-self.bp.quit:
+ break INIT
+ }
+ }
+ close(self.processC)
+ close(self.quitInitC)
+ }()
+}
+
+func (self *section) blockHashesRequest() {
+
+ if self.switchC != nil {
+ self.bp.chainLock.Lock()
+ parentSection := self.parent
+
+ if parentSection == nil {
+
+ // only link to new parent if not switching peers
+ // this protects against synchronisation issue where during switching
+ // a demoted peer's fork will be chosen over the best peer's chain
+ // because relinking the correct chain (activateChain) is overwritten here in
+ // demoted peer's section process just before the section is put to idle mode
+ if self.parentHash != nil {
+ if parent := self.bp.get(self.parentHash); parent != nil {
+ parentSection = parent.section
+ plog.DebugDetailf("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection))
+ link(parentSection, self)
+ } else {
+ if self.bp.hasBlock(self.parentHash) {
+ self.poolRoot = true
+ plog.DebugDetailf("[%s] blockHashesRequest: parentHash known ... inserting section in blockchain", sectionhex(self))
+ self.addSectionToBlockChain(self.peer)
+ self.blockHashesRequestTimer = nil
+ self.blockHashesRequestsComplete = true
+ }
+ }
+ }
+ }
+ self.bp.chainLock.Unlock()
+
+ if !self.poolRoot {
+ if parentSection != nil {
+ // activate parent section with this peer
+ // but only if not during switch mode
+ plog.DebugDetailf("[%s] parent section [%s] activated\n", sectionhex(self), sectionhex(parentSection))
+ self.bp.activateChain(parentSection, self.peer, nil)
+ // if not root of chain, switch off
+ plog.DebugDetailf("[%s] parent found, hash requests deactivated (after %v total attempts)\n", sectionhex(self), self.blockHashesRequests)
+ self.blockHashesRequestTimer = nil
+ self.blockHashesRequestsComplete = true
+ } else {
+ self.blockHashesRequests++
+ plog.DebugDetailf("[%s] hash request on root (%v total attempts)\n", sectionhex(self), self.blockHashesRequests)
+ self.peer.requestBlockHashes(self.bottom.hash)
+ self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
+ }
+ }
+ }
+}
+
+// checks number of missing blocks after each round of request and acts accordingly
+func (self *section) checkRound() {
+ if self.missing == 0 {
+ // no missing blocks
+ plog.DebugDetailf("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
+ self.blocksRequestsComplete = true
+ self.blocksRequestTimer = nil
+ } else {
+ // some missing blocks
+ plog.DebugDetailf("[%s] section checked: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth)
+ self.blocksRequests++
+ pos := self.missing % self.bp.Config.BlockBatchSize
+ if pos == 0 {
+ pos = self.bp.Config.BlockBatchSize
+ }
+ self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:pos])
+
+ // handle idle rounds
+ if self.missing == self.lastMissing {
+ // idle round
+ if self.same {
+ // more than once
+ self.idle++
+ // too many idle rounds
+ if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds {
+ plog.DebugDetailf("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth)
+ self.suicide()
+ }
+ } else {
+ self.idle = 0
+ }
+ self.same = true
+ } else {
+ self.same = false
+ }
+ self.lastMissing = self.missing
+ // put processC offline
+ self.processC = nil
+ self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
+ }
+}
+
+/*
+ link connects two sections via parent/child fields
+ creating a doubly linked list
+ caller must hold BlockPool chainLock
+*/
+func link(parent *section, child *section) {
+ if parent != nil {
+ exChild := parent.child
+ parent.child = child
+ if exChild != nil && exChild != child {
+ if child != nil {
+ // if child is nil it is not a real fork
+ plog.DebugDetailf("[%s] chain fork [%s] -> [%s]", sectionhex(parent), sectionhex(exChild), sectionhex(child))
+ }
+ exChild.parent = nil
+ }
+ }
+ if child != nil {
+ exParent := child.parent
+ if exParent != nil && exParent != parent {
+ if parent != nil {
+ // if parent is nil it is not a real fork, but suicide delinking section
+ plog.DebugDetailf("[%s] chain reverse fork [%s] -> [%s]", sectionhex(child), sectionhex(exParent), sectionhex(parent))
+ }
+ exParent.child = nil
+ }
+ child.parent = parent
+ }
+}
+
+/*
+ handle forks where connecting node is mid-section
+ by splitting section at fork
+ no splitting needed if connecting node is head of a section
+ caller must hold chain lock
+*/
+func (self *BlockPool) splitSection(parent *section, entry *entry) {
+ plog.DebugDetailf("[%s] split section at fork", sectionhex(parent))
+ parent.deactivate()
+ waiter := make(chan bool)
+ parent.wait(waiter)
+ chain := parent.nodes
+ parent.nodes = chain[entry.index.int:]
+ parent.top = parent.nodes[0]
+ parent.poolRootIndex -= entry.index.int
+ orphan := self.newSection(chain[0:entry.index.int])
+ link(orphan, parent.child)
+ close(waiter)
+ orphan.deactivate()
+}
+
+func (self *section) wait(waiter chan bool) {
+ self.forkC <- waiter
+}
+
+func (self *BlockPool) linkSections(nodes []*node, parent, child *section) (sec *section) {
+ // if new section is created, link it to parent/child sections
+ // and launch section process fetching block and further hashes
+ if len(nodes) > 0 {
+ sec = self.newSection(nodes)
+ plog.Debugf("[%s]->[%s](%v)->[%s] new chain section", sectionhex(parent), sectionhex(sec), len(nodes), sectionhex(child))
+ link(parent, sec)
+ link(sec, child)
+ } else {
+ // now this can only happen if we allow response to hash request to include <from> hash
+ // in this case we just link parent and child (without needing root block of child section)
+ plog.Debugf("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child))
+ link(parent, child)
+ }
+ return
+}
+
+func (self *section) activate(p *peer) {
+ self.bp.wg.Add(1)
+ select {
+ case <-self.offC:
+ self.bp.wg.Done()
+ case self.controlC <- p:
+ plog.DebugDetailf("[%s] activate section process for peer <%s>", sectionhex(self), p.id)
+ }
+}
+
+func (self *section) deactivate() {
+ self.bp.wg.Add(1)
+ self.controlC <- nil
+}
+
+func (self *section) suicide() {
+ select {
+ case <-self.suicideC:
+ return
+ default:
+ }
+ close(self.suicideC)
+}
+
+// removes this section exacly
+func (self *section) remove() {
+ select {
+ case <-self.offC:
+ // section is complete, no process
+ self.unlink()
+ self.bp.remove(self)
+ close(self.suicideC)
+ plog.DebugDetailf("[%s] remove: suicide", sectionhex(self))
+ case <-self.suicideC:
+ plog.DebugDetailf("[%s] remove: suicided already", sectionhex(self))
+ default:
+ plog.DebugDetailf("[%s] remove: suicide", sectionhex(self))
+ close(self.suicideC)
+ }
+ plog.DebugDetailf("[%s] removed section.", sectionhex(self))
+
+}
+
+// remove a section and all its descendents from the pool
+func (self *section) removeInvalidChain() {
+ // need to get the child before removeSection delinks the section
+ self.bp.chainLock.RLock()
+ child := self.child
+ self.bp.chainLock.RUnlock()
+
+ plog.DebugDetailf("[%s] remove invalid chain", sectionhex(self))
+ self.remove()
+ if child != nil {
+ child.removeInvalidChain()
+ }
+}
+
+// unlink a section from its parent/child
+func (self *section) unlink() {
+ // first delink from child and parent under chainlock
+ self.bp.chainLock.Lock()
+ link(nil, self)
+ link(self, nil)
+ self.bp.chainLock.Unlock()
+}