aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go177
1 files changed, 119 insertions, 58 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 88ceeb5ac..1bbba11ed 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -3,6 +3,7 @@ package downloader
import (
"bytes"
"errors"
+ "math"
"math/rand"
"sync"
"sync/atomic"
@@ -28,25 +29,27 @@ var (
crossCheckCycle = time.Second // Period after which to check for expired cross checks
maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
+ maxBlockProcess = 256 // Number of blocks to import at once into the chain
)
var (
- errBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- errBadPeer = errors.New("action from bad peer ignored")
- errStallingPeer = errors.New("peer is stalling")
- errBannedHead = errors.New("peer head hash already banned")
- errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
- errTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- errInvalidChain = errors.New("retrieved hash chain is invalid")
- errCrossCheckFailed = errors.New("block cross-check failed")
- errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
- errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errBannedHead = errors.New("peer head hash already banned")
+ errNoPeers = errors.New("no peers to keep download active")
+ errPendingQueue = errors.New("pending items in queue")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errCrossCheckFailed = errors.New("block cross-check failed")
+ errCancelHashFetch = errors.New("hash fetching canceled (requested)")
+ errCancelBlockFetch = errors.New("block downloading canceled (requested)")
+ errCancelChainImport = errors.New("chain importing canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
@@ -55,6 +58,9 @@ type hashCheckFn func(common.Hash) bool
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
+// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type chainInsertFn func(types.Blocks) (int, error)
+
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
@@ -88,13 +94,15 @@ type Downloader struct {
importLock sync.Mutex
// Callbacks
- hasBlock hashCheckFn // Checks if a block is present in the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- dropPeer peerDropFn // Retrieved the TD of our own chain
+ hasBlock hashCheckFn // Checks if a block is present in the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ insertChain chainInsertFn // Injects a batch of blocks into the chain
+ dropPeer peerDropFn // Retrieved the TD of our own chain
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
+ processing int32
notified int32
// Channels
@@ -113,18 +121,19 @@ type Block struct {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader {
+func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
// Create the base downloader
downloader := &Downloader{
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasBlock: hasBlock,
- getBlock: getBlock,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ hasBlock: hasBlock,
+ getBlock: getBlock,
+ insertChain: insertChain,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan hashPack, 1),
+ blockCh: make(chan blockPack, 1),
}
// Inject all the known bad hashes
downloader.banned = set.New()
@@ -157,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t
return
}
-// Synchronising returns the state of the downloader
+// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
}
@@ -260,19 +269,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
return d.syncWithPeer(p, hash)
}
-// TakeBlocks takes blocks from the queue and yields them to the caller.
-func (d *Downloader) TakeBlocks() []*Block {
- blocks := d.queue.TakeBlocks()
- if len(blocks) > 0 {
- d.importLock.Lock()
- d.importStart = time.Now()
- d.importQueue = blocks
- d.importDone = 0
- d.importLock.Unlock()
- }
- return blocks
-}
-
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
func (d *Downloader) Has(hash common.Hash) bool {
@@ -307,19 +303,16 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
// Cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
-func (d *Downloader) Cancel() bool {
- // If we're not syncing just return.
- hs, bs := d.queue.Size()
- if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
- return false
- }
+func (d *Downloader) Cancel() {
// Close the current cancel channel
d.cancelLock.Lock()
- select {
- case <-d.cancelCh:
- // Channel was already closed
- default:
- close(d.cancelCh)
+ if d.cancelCh != nil {
+ select {
+ case <-d.cancelCh:
+ // Channel was already closed
+ default:
+ close(d.cancelCh)
+ }
}
d.cancelLock.Unlock()
@@ -330,11 +323,11 @@ func (d *Downloader) Cancel() bool {
d.importQueue = nil
d.importDone = 0
d.importLock.Unlock()
-
- return true
}
-// XXX Make synchronous
+// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
+// up until it finds a common ancestor. If the source peer times out, alternative
+// ones are tried for continuation.
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
var (
start = time.Now()
@@ -530,10 +523,13 @@ out:
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
break
}
- // All was successful, promote the peer
+ // All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+ if atomic.LoadInt32(&d.processing) == 0 {
+ go d.process()
+ }
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -709,6 +705,71 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
}
}
+// process takes blocks from the queue and tries to import them into the chain.
+func (d *Downloader) process() (err error) {
+ // Make sure only one goroutine is ever allowed to process blocks at once
+ if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
+ return
+ }
+ // If the processor just exited, but there are freshly pending items, try to
+ // reenter. This is needed because the goroutine spinned up for processing
+ // the fresh blocks might have been rejected entry to to this present thread
+ // not yet releasing the `processing` state.
+ defer func() {
+ if err == nil && d.queue.GetHeadBlock() != nil {
+ err = d.process()
+ }
+ }()
+ // Release the lock upon exit (note, before checking for reentry!)
+ defer atomic.StoreInt32(&d.processing, 0)
+
+ // Fetch the current cancel channel to allow termination
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
+
+ // Repeat the processing as long as there are blocks to import
+ for {
+ // Fetch the next batch of blocks
+ blocks := d.queue.TakeBlocks()
+ if len(blocks) == 0 {
+ return nil
+ }
+ // Reset the import statistics
+ d.importLock.Lock()
+ d.importStart = time.Now()
+ d.importQueue = blocks
+ d.importDone = 0
+ d.importLock.Unlock()
+
+ // Actually import the blocks
+ glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
+ for len(blocks) != 0 { // TODO: quit
+ // Check for any termination requests
+ select {
+ case <-cancel:
+ return errCancelChainImport
+ default:
+ }
+ // Retrieve the first batch of blocks to insert
+ max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
+ raw := make(types.Blocks, 0, max)
+ for _, block := range blocks[:max] {
+ raw = append(raw, block.RawBlock)
+ }
+ // Try to inset the blocks, drop the originating peer if there's an error
+ index, err := d.insertChain(raw)
+ if err != nil {
+ glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err)
+ d.dropPeer(blocks[index].OriginPeer)
+ d.Cancel()
+ return errCancelChainImport
+ }
+ blocks = blocks[max:]
+ }
+ }
+}
+
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {