aboutsummaryrefslogtreecommitdiffstats
path: root/eth/block_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/block_pool.go')
-rw-r--r--eth/block_pool.go1015
1 files changed, 1015 insertions, 0 deletions
diff --git a/eth/block_pool.go b/eth/block_pool.go
new file mode 100644
index 000000000..7cfbc63f8
--- /dev/null
+++ b/eth/block_pool.go
@@ -0,0 +1,1015 @@
+package eth
+
+import (
+ "math"
+ "math/big"
+ "math/rand"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethutil"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/pow"
+)
+
+var poolLogger = ethlogger.NewLogger("Blockpool")
+
+const (
+ blockHashesBatchSize = 256
+ blockBatchSize = 64
+ blocksRequestInterval = 10 // seconds
+ blocksRequestRepetition = 1
+ blockHashesRequestInterval = 10 // seconds
+ blocksRequestMaxIdleRounds = 10
+ cacheTimeout = 3 // minutes
+ blockTimeout = 5 // minutes
+)
+
+type poolNode struct {
+ lock sync.RWMutex
+ hash []byte
+ block *types.Block
+ child *poolNode
+ parent *poolNode
+ section *section
+ knownParent bool
+ peer string
+ source string
+ complete bool
+}
+
+type BlockPool struct {
+ lock sync.RWMutex
+ pool map[string]*poolNode
+
+ peersLock sync.RWMutex
+ peers map[string]*peerInfo
+ peer *peerInfo
+
+ quit chan bool
+ wg sync.WaitGroup
+ running bool
+
+ // the minimal interface with blockchain
+ hasBlock func(hash []byte) bool
+ insertChain func(types.Blocks) error
+ verifyPoW func(pow.Block) bool
+}
+
+type peerInfo struct {
+ lock sync.RWMutex
+
+ td *big.Int
+ currentBlock []byte
+ id string
+
+ requestBlockHashes func([]byte) error
+ requestBlocks func([][]byte) error
+ peerError func(int, string, ...interface{})
+
+ sections map[string]*section
+ roots []*poolNode
+ quitC chan bool
+}
+
+func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(pow.Block) bool,
+) *BlockPool {
+ return &BlockPool{
+ hasBlock: hasBlock,
+ insertChain: insertChain,
+ verifyPoW: verifyPoW,
+ }
+}
+
+// allows restart
+func (self *BlockPool) Start() {
+ self.lock.Lock()
+ if self.running {
+ self.lock.Unlock()
+ return
+ }
+ self.running = true
+ self.quit = make(chan bool)
+ self.pool = make(map[string]*poolNode)
+ self.lock.Unlock()
+
+ self.peersLock.Lock()
+ self.peers = make(map[string]*peerInfo)
+ self.peersLock.Unlock()
+
+ poolLogger.Infoln("Started")
+
+}
+
+func (self *BlockPool) Stop() {
+ self.lock.Lock()
+ if !self.running {
+ self.lock.Unlock()
+ return
+ }
+ self.running = false
+ self.lock.Unlock()
+
+ poolLogger.Infoln("Stopping")
+
+ close(self.quit)
+ self.lock.Lock()
+ self.peersLock.Lock()
+ self.peers = nil
+ self.pool = nil
+ self.peer = nil
+ self.wg.Wait()
+ self.lock.Unlock()
+ self.peersLock.Unlock()
+ poolLogger.Infoln("Stopped")
+
+}
+
+// AddPeer is called by the eth protocol instance running on the peer after
+// the status message has been received with total difficulty and current block hash
+// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects
+func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) bool {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ if self.peers[peerId] != nil {
+ panic("peer already added")
+ }
+ peer := &peerInfo{
+ td: td,
+ currentBlock: currentBlock,
+ id: peerId, //peer.Identity().Pubkey()
+ requestBlockHashes: requestBlockHashes,
+ requestBlocks: requestBlocks,
+ peerError: peerError,
+ }
+ self.peers[peerId] = peer
+ poolLogger.Debugf("add new peer %v with td %v", peerId, td)
+ currentTD := ethutil.Big0
+ if self.peer != nil {
+ currentTD = self.peer.td
+ }
+ if td.Cmp(currentTD) > 0 {
+ self.peer.stop(peer)
+ peer.start(self.peer)
+ poolLogger.Debugf("peer %v promoted to best peer", peerId)
+ self.peer = peer
+ return true
+ }
+ return false
+}
+
+// RemovePeer is called by the eth protocol when the peer disconnects
+func (self *BlockPool) RemovePeer(peerId string) {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ peer := self.peers[peerId]
+ if peer == nil {
+ return
+ }
+ self.peers[peerId] = nil
+ poolLogger.Debugf("remove peer %v", peerId[0:4])
+
+ // if current best peer is removed, need find a better one
+ if self.peer != nil && peerId == self.peer.id {
+ var newPeer *peerInfo
+ max := ethutil.Big0
+ // peer with the highest self-acclaimed TD is chosen
+ for _, info := range self.peers {
+ if info.td.Cmp(max) > 0 {
+ max = info.td
+ newPeer = info
+ }
+ }
+ self.peer.stop(peer)
+ peer.start(self.peer)
+ if newPeer != nil {
+ poolLogger.Debugf("peer %v with td %v promoted to best peer", newPeer.id[0:4], newPeer.td)
+ } else {
+ poolLogger.Warnln("no peers left")
+ }
+ }
+}
+
+// Entry point for eth protocol to add block hashes received via BlockHashesMsg
+// only hashes from the best peer is handled
+// this method is always responsible to initiate further hash requests until
+// a known parent is reached unless cancelled by a peerChange event
+// this process also launches all request processes on each chain section
+// this function needs to run asynchronously for one peer since the message is discarded???
+func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+
+ // check if this peer is the best
+ peer, best := self.getPeer(peerId)
+ if !best {
+ return
+ }
+ // peer is still the best
+
+ var child *poolNode
+ var depth int
+
+ // iterate using next (rlp stream lazy decoder) feeding hashesC
+ self.wg.Add(1)
+ go func() {
+ for {
+ select {
+ case <-self.quit:
+ return
+ case <-peer.quitC:
+ // if the peer is demoted, no more hashes taken
+ break
+ default:
+ hash, ok := next()
+ if !ok {
+ // message consumed chain skeleton built
+ break
+ }
+ // check if known block connecting the downloaded chain to our blockchain
+ if self.hasBlock(hash) {
+ poolLogger.Infof("known block (%x...)\n", hash[0:4])
+ if child != nil {
+ child.Lock()
+ // mark child as absolute pool root with parent known to blockchain
+ child.knownParent = true
+ child.Unlock()
+ }
+ break
+ }
+ //
+ var parent *poolNode
+ // look up node in pool
+ parent = self.get(hash)
+ if parent != nil {
+ // reached a known chain in the pool
+ // request blocks on the newly added part of the chain
+ if child != nil {
+ self.link(parent, child)
+
+ // activate the current chain
+ self.activateChain(parent, peer, true)
+ poolLogger.Debugf("potential chain of %v blocks added, reached blockpool, activate chain", depth)
+ break
+ }
+ // if this is the first hash, we expect to find it
+ parent.RLock()
+ grandParent := parent.parent
+ parent.RUnlock()
+ if grandParent != nil {
+ // activate the current chain
+ self.activateChain(parent, peer, true)
+ poolLogger.Debugf("block hash found, activate chain")
+ break
+ }
+ // the first node is the root of a chain in the pool, rejoice and continue
+ }
+ // if node does not exist, create it and index in the pool
+ section := &section{}
+ if child == nil {
+ section.top = parent
+ }
+ parent = &poolNode{
+ hash: hash,
+ child: child,
+ section: section,
+ peer: peerId,
+ }
+ self.set(hash, parent)
+ poolLogger.Debugf("create potential block for %x...", hash[0:4])
+
+ depth++
+ child = parent
+ }
+ }
+ if child != nil {
+ poolLogger.Debugf("chain of %v hashes added", depth)
+ // start a processSection on the last node, but switch off asking
+ // hashes and blocks until next peer confirms this chain
+ section := self.processSection(child)
+ peer.addSection(child.hash, section)
+ section.start()
+ }
+ }()
+}
+
+// AddBlock is the entry point for the eth protocol when blockmsg is received upon requests
+// It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error (which can be ignored)
+// block is checked for PoW
+// only the first PoW-valid block for a hash is considered legit
+func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
+ hash := block.Hash()
+ node := self.get(hash)
+ node.RLock()
+ b := node.block
+ node.RUnlock()
+ if b != nil {
+ return
+ }
+ if node == nil && !self.hasBlock(hash) {
+ self.peerError(peerId, ErrUnrequestedBlock, "%x", hash)
+ return
+ }
+ // validate block for PoW
+ if !self.verifyPoW(block) {
+ self.peerError(peerId, ErrInvalidPoW, "%x", hash)
+ }
+ node.Lock()
+ node.block = block
+ node.source = peerId
+ node.Unlock()
+}
+
+// iterates down a known poolchain and activates fetching processes
+// on each chain section for the peer
+// stops if the peer is demoted
+// registers last section root as root for the peer (in case peer is promoted a second time, to remember)
+func (self *BlockPool) activateChain(node *poolNode, peer *peerInfo, on bool) {
+ self.wg.Add(1)
+ go func() {
+ for {
+ node.sectionRLock()
+ bottom := node.section.bottom
+ if bottom == nil { // the chain section is being created or killed
+ break
+ }
+ // register this section with the peer
+ if peer != nil {
+ peer.addSection(bottom.hash, bottom.section)
+ if on {
+ bottom.section.start()
+ } else {
+ bottom.section.start()
+ }
+ }
+ if bottom.parent == nil {
+ node = bottom
+ break
+ }
+ // if peer demoted stop activation
+ select {
+ case <-peer.quitC:
+ break
+ default:
+ }
+
+ node = bottom.parent
+ bottom.sectionRUnlock()
+ }
+ // remember root for this peer
+ peer.addRoot(node)
+ self.wg.Done()
+ }()
+}
+
+// main worker thread on each section in the poolchain
+// - kills the section if there are blocks missing after an absolute time
+// - kills the section if there are maxIdleRounds of idle rounds of block requests with no response
+// - periodically polls the chain section for missing blocks which are then requested from peers
+// - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they expire and killed ()
+// - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
+// - when turned back on it recursively calls itself on the root of the next chain section
+// - when exits, signals to
+func (self *BlockPool) processSection(node *poolNode) *section {
+ // absolute time after which sub-chain is killed if not complete (some blocks are missing)
+ suicideTimer := time.After(blockTimeout * time.Minute)
+ var blocksRequestTimer, blockHashesRequestTimer <-chan time.Time
+ var nodeC, missingC, processC chan *poolNode
+ controlC := make(chan bool)
+ resetC := make(chan bool)
+ var hashes [][]byte
+ var i, total, missing, lastMissing, depth int
+ var blockHashesRequests, blocksRequests int
+ var idle int
+ var init, alarm, done, same, running, once bool
+ orignode := node
+ hash := node.hash
+
+ node.sectionLock()
+ defer node.sectionUnlock()
+ section := &section{controlC: controlC, resetC: resetC}
+ node.section = section
+
+ go func() {
+ self.wg.Add(1)
+ for {
+ node.sectionRLock()
+ controlC = node.section.controlC
+ node.sectionRUnlock()
+
+ if init {
+ // missing blocks read from nodeC
+ // initialized section
+ if depth == 0 {
+ break
+ }
+ // enable select case to read missing block when ready
+ processC = missingC
+ missingC = make(chan *poolNode, lastMissing)
+ nodeC = nil
+ // only do once
+ init = false
+ } else {
+ if !once {
+ missingC = nil
+ processC = nil
+ i = 0
+ total = 0
+ lastMissing = 0
+ }
+ }
+
+ // went through all blocks in section
+ if i != 0 && i == lastMissing {
+ if len(hashes) > 0 {
+ // send block requests to peers
+ self.requestBlocks(blocksRequests, hashes)
+ }
+ blocksRequests++
+ poolLogger.Debugf("[%x] block request attempt %v: missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+ if missing == lastMissing {
+ // idle round
+ if same {
+ // more than once
+ idle++
+ // too many idle rounds
+ if idle > blocksRequestMaxIdleRounds {
+ poolLogger.Debugf("[%x] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", hash[0:4], idle, blocksRequests, missing, total, depth)
+ self.killChain(node, nil)
+ break
+ }
+ } else {
+ idle = 0
+ }
+ same = true
+ } else {
+ if missing == 0 {
+ // no missing nodes
+ poolLogger.Debugf("block request process complete on section %x... (%v total blocksRequests): missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
+ node.Lock()
+ orignode.complete = true
+ node.Unlock()
+ blocksRequestTimer = nil
+ if blockHashesRequestTimer == nil {
+ // not waiting for hashes any more
+ poolLogger.Debugf("hash request on root %x... successful (%v total attempts)\nquitting...", hash[0:4], blockHashesRequests)
+ break
+ } // otherwise suicide if no hashes coming
+ }
+ same = false
+ }
+ lastMissing = missing
+ i = 0
+ missing = 0
+ // ready for next round
+ done = true
+ }
+ if done && alarm {
+ poolLogger.Debugf("start checking if new blocks arrived (attempt %v): missing %v/%v/%v", blocksRequests, missing, total, depth)
+ blocksRequestTimer = time.After(blocksRequestInterval * time.Second)
+ alarm = false
+ done = false
+ // processC supposed to be empty and never closed so just swap, no need to allocate
+ tempC := processC
+ processC = missingC
+ missingC = tempC
+ }
+ select {
+ case <-self.quit:
+ break
+ case <-suicideTimer:
+ self.killChain(node, nil)
+ poolLogger.Warnf("[%x] timeout. (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+ break
+ case <-blocksRequestTimer:
+ alarm = true
+ case <-blockHashesRequestTimer:
+ orignode.RLock()
+ parent := orignode.parent
+ orignode.RUnlock()
+ if parent != nil {
+ // if not root of chain, switch off
+ poolLogger.Debugf("[%x] parent found, hash requests deactivated (after %v total attempts)\n", hash[0:4], blockHashesRequests)
+ blockHashesRequestTimer = nil
+ } else {
+ blockHashesRequests++
+ poolLogger.Debugf("[%x] hash request on root (%v total attempts)\n", hash[0:4], blockHashesRequests)
+ self.requestBlockHashes(parent.hash)
+ blockHashesRequestTimer = time.After(blockHashesRequestInterval * time.Second)
+ }
+ case r, ok := <-controlC:
+ if !ok {
+ break
+ }
+ if running && !r {
+ poolLogger.Debugf("process on section %x... (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+
+ alarm = false
+ blocksRequestTimer = nil
+ blockHashesRequestTimer = nil
+ processC = nil
+ }
+ if !running && r {
+ poolLogger.Debugf("[%x] on", hash[0:4])
+
+ orignode.RLock()
+ parent := orignode.parent
+ complete := orignode.complete
+ knownParent := orignode.knownParent
+ orignode.RUnlock()
+ if !complete {
+ poolLogger.Debugf("[%x] activate block requests", hash[0:4])
+ blocksRequestTimer = time.After(0)
+ }
+ if parent == nil && !knownParent {
+ // if no parent but not connected to blockchain
+ poolLogger.Debugf("[%x] activate block hashes requests", hash[0:4])
+ blockHashesRequestTimer = time.After(0)
+ } else {
+ blockHashesRequestTimer = nil
+ }
+ alarm = true
+ processC = missingC
+ if !once {
+ // if not run at least once fully, launch iterator
+ processC = make(chan *poolNode)
+ missingC = make(chan *poolNode)
+ self.foldUp(orignode, processC)
+ once = true
+ }
+ }
+ total = lastMissing
+ case <-resetC:
+ once = false
+ init = false
+ done = false
+ case node, ok := <-processC:
+ if !ok {
+ // channel closed, first iteration finished
+ init = true
+ once = true
+ continue
+ }
+ i++
+ // if node has no block
+ node.RLock()
+ block := node.block
+ nhash := node.hash
+ knownParent := node.knownParent
+ node.RUnlock()
+ if !init {
+ depth++
+ }
+ if block == nil {
+ missing++
+ if !init {
+ total++
+ }
+ hashes = append(hashes, nhash)
+ if len(hashes) == blockBatchSize {
+ self.requestBlocks(blocksRequests, hashes)
+ hashes = nil
+ }
+ missingC <- node
+ } else {
+ // block is found
+ if knownParent {
+ // connected to the blockchain, insert the longest chain of blocks
+ var blocks types.Blocks
+ child := node
+ parent := node
+ node.sectionRLock()
+ for child != nil && child.block != nil {
+ parent = child
+ blocks = append(blocks, parent.block)
+ child = parent.child
+ }
+ node.sectionRUnlock()
+ poolLogger.Debugf("[%x] insert %v blocks into blockchain", hash[0:4], len(blocks))
+ if err := self.insertChain(blocks); err != nil {
+ // TODO: not clear which peer we need to address
+ // peerError should dispatch to peer if still connected and disconnect
+ self.peerError(node.source, ErrInvalidBlock, "%v", err)
+ poolLogger.Debugf("invalid block %v", node.hash)
+ poolLogger.Debugf("penalise peers %v (hash), %v (block)", node.peer, node.source)
+ // penalise peer in node.source
+ self.killChain(node, nil)
+ // self.disconnect()
+ break
+ }
+ // if suceeded mark the next one (no block yet) as connected to blockchain
+ if child != nil {
+ child.Lock()
+ child.knownParent = true
+ child.Unlock()
+ }
+ // reset starting node to first node with missing block
+ orignode = child
+ // pop the inserted ancestors off the channel
+ for i := 1; i < len(blocks); i++ {
+ <-processC
+ }
+ // delink inserted chain section
+ self.killChain(node, parent)
+ }
+ }
+ }
+ }
+ poolLogger.Debugf("[%x] quit after\n%v block hashes requests\n%v block requests: missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
+
+ self.wg.Done()
+ node.sectionLock()
+ node.section.controlC = nil
+ node.sectionUnlock()
+ // this signals that controller not available
+ }()
+ return section
+
+}
+
+func (self *BlockPool) peerError(peerId string, code int, format string, params ...interface{}) {
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ peer, ok := self.peers[peerId]
+ if ok {
+ peer.peerError(code, format, params...)
+ }
+}
+
+func (self *BlockPool) requestBlockHashes(hash []byte) {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ if self.peer != nil {
+ self.peer.requestBlockHashes(hash)
+ }
+}
+
+func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) {
+ // distribute block request among known peers
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ peerCount := len(self.peers)
+ // on first attempt use the best peer
+ if attempts == 0 {
+ self.peer.requestBlocks(hashes)
+ return
+ }
+ repetitions := int(math.Min(float64(peerCount), float64(blocksRequestRepetition)))
+ poolLogger.Debugf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
+ i := 0
+ indexes := rand.Perm(peerCount)[0:(repetitions - 1)]
+ sort.Ints(indexes)
+ for _, peer := range self.peers {
+ if i == indexes[0] {
+ peer.requestBlocks(hashes)
+ indexes = indexes[1:]
+ if len(indexes) == 0 {
+ break
+ }
+ }
+ i++
+ }
+}
+
+func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) {
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ if self.peer != nil && self.peer.id == peerId {
+ return self.peer, true
+ }
+ info, ok := self.peers[peerId]
+ if !ok {
+ panic("unknown peer")
+ }
+ return info, false
+}
+
+func (self *peerInfo) addSection(hash []byte, section *section) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.sections[string(hash)] = section
+}
+
+func (self *peerInfo) addRoot(node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.roots = append(self.roots, node)
+}
+
+// (re)starts processes registered for this peer (self)
+func (self *peerInfo) start(peer *peerInfo) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.quitC = make(chan bool)
+ for _, root := range self.roots {
+ root.sectionRLock()
+ if root.section.bottom != nil {
+ if root.parent == nil {
+ self.requestBlockHashes(root.hash)
+ }
+ }
+ root.sectionRUnlock()
+ }
+ self.roots = nil
+ self.controlSections(peer, true)
+}
+
+// (re)starts process without requests, only suicide timer
+func (self *peerInfo) stop(peer *peerInfo) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ close(self.quitC)
+ self.controlSections(peer, false)
+}
+
+func (self *peerInfo) controlSections(peer *peerInfo, on bool) {
+ if peer != nil {
+ peer.lock.RLock()
+ defer peer.lock.RUnlock()
+ }
+ for hash, section := range peer.sections {
+ if section.done() {
+ delete(self.sections, hash)
+ }
+ _, exists := peer.sections[hash]
+ if on || peer == nil || exists {
+ if on {
+ // self is best peer
+ section.start()
+ } else {
+ // (re)starts process without requests, only suicide timer
+ section.stop()
+ }
+ }
+ }
+}
+
+// called when parent is found in pool
+// parent and child are guaranteed to be on different sections
+func (self *BlockPool) link(parent, child *poolNode) {
+ var top bool
+ parent.sectionLock()
+ if child != nil {
+ child.sectionLock()
+ }
+ if parent == parent.section.top && parent.section.top != nil {
+ top = true
+ }
+ var bottom bool
+
+ if child == child.section.bottom {
+ bottom = true
+ }
+ if parent.child != child {
+ orphan := parent.child
+ if orphan != nil {
+ // got a fork in the chain
+ if top {
+ orphan.lock.Lock()
+ // make old child orphan
+ orphan.parent = nil
+ orphan.lock.Unlock()
+ } else { // we are under section lock
+ // make old child orphan
+ orphan.parent = nil
+ // reset section objects above the fork
+ nchild := orphan.child
+ node := orphan
+ section := &section{bottom: orphan}
+ for node.section == nchild.section {
+ node = nchild
+ node.section = section
+ nchild = node.child
+ }
+ section.top = node
+ // set up a suicide
+ self.processSection(orphan).stop()
+ }
+ } else {
+ // child is on top of a chain need to close section
+ child.section.bottom = child
+ }
+ // adopt new child
+ parent.child = child
+ if !top {
+ parent.section.top = parent
+ // restart section process so that shorter section is scanned for blocks
+ parent.section.reset()
+ }
+ }
+
+ if child != nil {
+ if child.parent != parent {
+ stepParent := child.parent
+ if stepParent != nil {
+ if bottom {
+ stepParent.Lock()
+ stepParent.child = nil
+ stepParent.Unlock()
+ } else {
+ // we are on the same section
+ // if it is a aberrant reverse fork,
+ stepParent.child = nil
+ node := stepParent
+ nparent := stepParent.child
+ section := &section{top: stepParent}
+ for node.section == nparent.section {
+ node = nparent
+ node.section = section
+ node = node.parent
+ }
+ }
+ } else {
+ // linking to a root node, ie. parent is under the root of a chain
+ parent.section.top = parent
+ }
+ }
+ child.parent = parent
+ child.section.bottom = child
+ }
+ // this needed if someone lied about the parent before
+ child.knownParent = false
+
+ parent.sectionUnlock()
+ if child != nil {
+ child.sectionUnlock()
+ }
+}
+
+// this immediately kills the chain from node to end (inclusive) section by section
+func (self *BlockPool) killChain(node *poolNode, end *poolNode) {
+ poolLogger.Debugf("kill chain section with root node %v", node)
+
+ node.sectionLock()
+ node.section.abort()
+ self.set(node.hash, nil)
+ child := node.child
+ top := node.section.top
+ i := 1
+ self.wg.Add(1)
+ go func() {
+ var quit bool
+ for node != top && node != end && child != nil {
+ node = child
+ select {
+ case <-self.quit:
+ quit = true
+ break
+ default:
+ }
+ self.set(node.hash, nil)
+ child = node.child
+ }
+ poolLogger.Debugf("killed chain section of %v blocks with root node %v", i, node)
+ if !quit {
+ if node == top {
+ if node != end && child != nil && end != nil {
+ //
+ self.killChain(child, end)
+ }
+ } else {
+ if child != nil {
+ // delink rest of this section if ended midsection
+ child.section.bottom = child
+ child.parent = nil
+ }
+ }
+ }
+ node.section.bottom = nil
+ node.sectionUnlock()
+ self.wg.Done()
+ }()
+}
+
+// structure to store long range links on chain to skip along
+type section struct {
+ lock sync.RWMutex
+ bottom *poolNode
+ top *poolNode
+ controlC chan bool
+ resetC chan bool
+}
+
+func (self *section) start() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.controlC <- true
+ }
+}
+
+func (self *section) stop() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.controlC <- false
+ }
+}
+
+func (self *section) reset() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.resetC <- true
+ self.controlC <- false
+ }
+}
+
+func (self *section) abort() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.controlC != nil {
+ close(self.controlC)
+ self.controlC = nil
+ }
+}
+
+func (self *section) done() bool {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.controlC != nil {
+ return true
+ }
+ return false
+}
+
+func (self *BlockPool) get(hash []byte) (node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.pool[string(hash)]
+}
+
+func (self *BlockPool) set(hash []byte, node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.pool[string(hash)] = node
+}
+
+// first time for block request, this iteration retrieves nodes of the chain
+// from node up to top (all the way if nil) via child links
+// copies the controller
+// and feeds nodeC channel
+// this is performed under section readlock to prevent top from going away
+// when
+func (self *BlockPool) foldUp(node *poolNode, nodeC chan *poolNode) {
+ self.wg.Add(1)
+ go func() {
+ node.sectionRLock()
+ defer node.sectionRUnlock()
+ for node != nil {
+ select {
+ case <-self.quit:
+ break
+ case nodeC <- node:
+ if node == node.section.top {
+ break
+ }
+ node = node.child
+ }
+ }
+ close(nodeC)
+ self.wg.Done()
+ }()
+}
+
+func (self *poolNode) Lock() {
+ self.sectionLock()
+ self.lock.Lock()
+}
+
+func (self *poolNode) Unlock() {
+ self.lock.Unlock()
+ self.sectionUnlock()
+}
+
+func (self *poolNode) RLock() {
+ self.lock.RLock()
+}
+
+func (self *poolNode) RUnlock() {
+ self.lock.RUnlock()
+}
+
+func (self *poolNode) sectionLock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.Lock()
+}
+
+func (self *poolNode) sectionUnlock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.Unlock()
+}
+
+func (self *poolNode) sectionRLock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.RLock()
+}
+
+func (self *poolNode) sectionRUnlock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.RUnlock()
+}