aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go294
1 files changed, 221 insertions, 73 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 29b627771..306c4fd2d 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -3,6 +3,7 @@ package downloader
import (
"bytes"
"errors"
+ "math"
"math/rand"
"sync"
"sync/atomic"
@@ -28,32 +29,40 @@ var (
crossCheckCycle = time.Second // Period after which to check for expired cross checks
maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
+ maxBlockProcess = 256 // Number of blocks to import at once into the chain
)
var (
- errLowTd = errors.New("peers TD is too low")
- ErrBusy = errors.New("busy")
- errUnknownPeer = errors.New("peer is unknown or unhealthy")
- ErrBadPeer = errors.New("action from bad peer ignored")
- ErrStallingPeer = errors.New("peer is stalling")
- errBannedHead = errors.New("peer head hash already banned")
- errNoPeers = errors.New("no peers to keep download active")
- ErrPendingQueue = errors.New("pending items in queue")
- ErrTimeout = errors.New("timeout")
- ErrEmptyHashSet = errors.New("empty hash set by peer")
- errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
- errAlreadyInPool = errors.New("hash already in pool")
- ErrInvalidChain = errors.New("retrieved hash chain is invalid")
- ErrCrossCheckFailed = errors.New("block cross-check failed")
- errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
- errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
- errNoSyncActive = errors.New("no sync active")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer is unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
+ errBannedHead = errors.New("peer head hash already banned")
+ errNoPeers = errors.New("no peers to keep download active")
+ errPendingQueue = errors.New("pending items in queue")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
+ errAlreadyInPool = errors.New("hash already in pool")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errCrossCheckFailed = errors.New("block cross-check failed")
+ errCancelHashFetch = errors.New("hash fetching canceled (requested)")
+ errCancelBlockFetch = errors.New("block downloading canceled (requested)")
+ errCancelChainImport = errors.New("chain importing canceled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
+// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
type hashCheckFn func(common.Hash) bool
-type getBlockFn func(common.Hash) *types.Block
+
+// blockRetrievalFn is a callback type for retrieving a block from the local chain.
+type blockRetrievalFn func(common.Hash) *types.Block
+
+// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)
-type hashIterFn func() (common.Hash, error)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
type blockPack struct {
peerId string
@@ -78,13 +87,23 @@ type Downloader struct {
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
banned *set.Set // Set of hashes we've received and banned
+ // Statistics
+ importStart time.Time // Instance when the last blocks were taken from the cache
+ importQueue []*Block // Previously taken blocks to check import progress
+ importDone int // Number of taken blocks already imported from the last batch
+ importLock sync.Mutex
+
// Callbacks
- hasBlock hashCheckFn
- getBlock getBlockFn
+ hasBlock hashCheckFn // Checks if a block is present in the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ insertChain chainInsertFn // Injects a batch of blocks into the chain
+ dropPeer peerDropFn // Retrieved the TD of our own chain
// Status
- synchronising int32
- notified int32
+ synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
+ synchronising int32
+ processing int32
+ notified int32
// Channels
newPeerCh chan *peer
@@ -101,17 +120,20 @@ type Block struct {
OriginPeer string
}
-func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
+// New creates a new downloader to fetch hashes and blocks from remote peers.
+func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
// Create the base downloader
downloader := &Downloader{
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasBlock: hasBlock,
- getBlock: getBlock,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
+ mux: mux,
+ queue: newQueue(),
+ peers: newPeerSet(),
+ hasBlock: hasBlock,
+ getBlock: getBlock,
+ insertChain: insertChain,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan hashPack, 1),
+ blockCh: make(chan blockPack, 1),
}
// Inject all the known bad hashes
downloader.banned = set.New()
@@ -121,11 +143,30 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa
return downloader
}
-func (d *Downloader) Stats() (current int, max int) {
- return d.queue.Size()
+// Stats retrieves the current status of the downloader.
+func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
+ // Fetch the download status
+ pending, cached = d.queue.Size()
+
+ // Figure out the import progress
+ d.importLock.Lock()
+ defer d.importLock.Unlock()
+
+ for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
+ d.importQueue = d.importQueue[1:]
+ d.importDone++
+ }
+ importing = len(d.importQueue)
+
+ // Make an estimate on the total sync
+ estimate = 0
+ if d.importDone > 0 {
+ estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
+ }
+ return
}
-// Synchronising returns the state of the downloader
+// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
}
@@ -158,19 +199,47 @@ func (d *Downloader) UnregisterPeer(id string) error {
return nil
}
-// Synchronise will select the peer and use it for synchronising. If an empty string is given
+// Synchronise tries to sync up our local block chain with a remote peer, both
+// adding various sanity checks as well as wrapping it with various log entries.
+func (d *Downloader) Synchronise(id string, head common.Hash) {
+ glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)
+
+ switch err := d.synchronise(id, head); err {
+ case nil:
+ glog.V(logger.Detail).Infof("Synchronisation completed")
+
+ case errBusy:
+ glog.V(logger.Detail).Infof("Synchronisation already in progress")
+
+ case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed:
+ glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
+ d.dropPeer(id)
+
+ case errPendingQueue:
+ glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
+
+ default:
+ glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
+ }
+}
+
+// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
-func (d *Downloader) Synchronise(id string, hash common.Hash) error {
+func (d *Downloader) synchronise(id string, hash common.Hash) error {
+ // Mock out the synchonisation if testing
+ if d.synchroniseMock != nil {
+ return d.synchroniseMock(id, hash)
+ }
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
- return ErrBusy
+ return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
// If the head hash is banned, terminate immediately
if d.banned.Has(hash) {
- return ErrInvalidChain
+ return errBannedHead
}
// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
@@ -184,7 +253,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
- return ErrPendingQueue
+ return errPendingQueue
}
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
@@ -200,11 +269,6 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
return d.syncWithPeer(p, hash)
}
-// TakeBlocks takes blocks from the queue and yields them to the caller.
-func (d *Downloader) TakeBlocks() []*Block {
- return d.queue.TakeBlocks()
-}
-
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
func (d *Downloader) Has(hash common.Hash) bool {
@@ -239,29 +303,26 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
// Cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
-func (d *Downloader) Cancel() bool {
- // If we're not syncing just return.
- hs, bs := d.queue.Size()
- if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
- return false
- }
+func (d *Downloader) Cancel() {
// Close the current cancel channel
d.cancelLock.Lock()
- select {
- case <-d.cancelCh:
- // Channel was already closed
- default:
- close(d.cancelCh)
+ if d.cancelCh != nil {
+ select {
+ case <-d.cancelCh:
+ // Channel was already closed
+ default:
+ close(d.cancelCh)
+ }
}
d.cancelLock.Unlock()
- // reset the queue
+ // Reset the queue
d.queue.Reset()
-
- return true
}
-// XXX Make synchronous
+// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
+// up until it finds a common ancestor. If the source peer times out, alternative
+// ones are tried for continuation.
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
var (
start = time.Now()
@@ -279,7 +340,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
<-timeout.C // timeout channel should be initially empty.
getHashes := func(from common.Hash) {
- active.getHashes(from)
+ go active.getHashes(from)
timeout.Reset(hashTTL)
}
@@ -304,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
- return ErrEmptyHashSet
+ return errEmptyHashSet
}
for index, hash := range hashPack.hashes {
if d.banned.Has(hash) {
@@ -314,7 +375,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
if err := d.banBlocks(active.id, hash); err != nil {
glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
}
- return ErrInvalidChain
+ return errInvalidChain
}
}
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
@@ -331,12 +392,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
inserts := d.queue.Insert(hashPack.hashes)
if len(inserts) == 0 && !done {
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
- return ErrBadPeer
+ return errBadPeer
}
if !done {
// Check that the peer is not stalling the sync
if len(inserts) < MinHashFetch {
- return ErrStallingPeer
+ return errStallingPeer
}
// Try and fetch a random block to verify the hash batch
// Skip the last hash as the cross check races with the next hash fetch
@@ -348,9 +409,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
expire: time.Now().Add(blockSoftTTL),
parent: parent,
}
- active.getBlocks([]common.Hash{origin})
+ go active.getBlocks([]common.Hash{origin})
- // Also fetch a fresh
+ // Also fetch a fresh batch of hashes
getHashes(head)
continue
}
@@ -370,7 +431,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
block := blockPack.blocks[0]
if check, ok := d.checks[block.Hash()]; ok {
if block.ParentHash() != check.parent {
- return ErrCrossCheckFailed
+ return errCrossCheckFailed
}
delete(d.checks, block.Hash())
}
@@ -380,7 +441,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
for hash, check := range d.checks {
if time.Now().After(check.expire) {
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
- return ErrCrossCheckFailed
+ return errCrossCheckFailed
}
}
@@ -400,7 +461,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// if all peers have been tried, abort the process entirely or if the hash is
// the zero hash.
if p == nil || (head == common.Hash{}) {
- return ErrTimeout
+ return errTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
@@ -457,12 +518,13 @@ out:
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
break
}
- // All was successful, promote the peer
+ // All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+ go d.process()
- case ErrInvalidChain:
+ case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
return err
@@ -579,7 +641,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
return errCancelBlockFetch
case <-timeout:
- return ErrTimeout
+ return errTimeout
case <-d.hashCh:
// Out of bounds hashes received, ignore them
@@ -636,6 +698,92 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
}
}
+// process takes blocks from the queue and tries to import them into the chain.
+//
+// The algorithmic flow is as follows:
+// - The `processing` flag is swapped to 1 to ensure singleton access
+// - The current `cancel` channel is retrieved to detect sync abortions
+// - Blocks are iteratively taken from the cache and inserted into the chain
+// - When the cache becomes empty, insertion stops
+// - The `processing` flag is swapped back to 0
+// - A post-exit check is made whether new blocks became available
+// - This step is important: it handles a potential race condition between
+// checking for no more work, and releasing the processing "mutex". In
+// between these state changes, a block may have arrived, but a processing
+// attempt denied, so we need to re-enter to ensure the block isn't left
+// to idle in the cache.
+func (d *Downloader) process() (err error) {
+ // Make sure only one goroutine is ever allowed to process blocks at once
+ if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
+ return
+ }
+ // If the processor just exited, but there are freshly pending items, try to
+ // reenter. This is needed because the goroutine spinned up for processing
+ // the fresh blocks might have been rejected entry to to this present thread
+ // not yet releasing the `processing` state.
+ defer func() {
+ if err == nil && d.queue.GetHeadBlock() != nil {
+ err = d.process()
+ }
+ }()
+ // Release the lock upon exit (note, before checking for reentry!), and set
+ // the import statistics to zero.
+ defer func() {
+ d.importLock.Lock()
+ d.importQueue = nil
+ d.importDone = 0
+ d.importLock.Unlock()
+
+ atomic.StoreInt32(&d.processing, 0)
+ }()
+
+ // Fetch the current cancel channel to allow termination
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
+
+ // Repeat the processing as long as there are blocks to import
+ for {
+ // Fetch the next batch of blocks
+ blocks := d.queue.TakeBlocks()
+ if len(blocks) == 0 {
+ return nil
+ }
+ // Reset the import statistics
+ d.importLock.Lock()
+ d.importStart = time.Now()
+ d.importQueue = blocks
+ d.importDone = 0
+ d.importLock.Unlock()
+
+ // Actually import the blocks
+ glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
+ for len(blocks) != 0 { // TODO: quit
+ // Check for any termination requests
+ select {
+ case <-cancel:
+ return errCancelChainImport
+ default:
+ }
+ // Retrieve the first batch of blocks to insert
+ max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
+ raw := make(types.Blocks, 0, max)
+ for _, block := range blocks[:max] {
+ raw = append(raw, block.RawBlock)
+ }
+ // Try to inset the blocks, drop the originating peer if there's an error
+ index, err := d.insertChain(raw)
+ if err != nil {
+ glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
+ d.dropPeer(blocks[index].OriginPeer)
+ d.Cancel()
+ return errCancelChainImport
+ }
+ blocks = blocks[max:]
+ }
+ }
+}
+
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {