aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go282
1 files changed, 206 insertions, 76 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 85531ce15..29b627771 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,35 +1,33 @@
package downloader
import (
+ "bytes"
"errors"
"math/rand"
"sync"
"sync/atomic"
"time"
- "gopkg.in/fatih/set.v0"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "gopkg.in/fatih/set.v0"
)
-const (
+var (
MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling
MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
- peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
- hashTTL = 5 * time.Second // Time it takes for a hash request to time out
-)
+ hashTTL = 5 * time.Second // Time it takes for a hash request to time out
+ blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth
+ blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired
+ crossCheckCycle = time.Second // Period after which to check for expired cross checks
-var (
- blockTTL = 5 * time.Second // Time it takes for a block request to time out
- crossCheckCycle = time.Second // Period after which to check for expired cross checks
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
)
var (
@@ -38,10 +36,11 @@ var (
errUnknownPeer = errors.New("peer is unknown or unhealthy")
ErrBadPeer = errors.New("action from bad peer ignored")
ErrStallingPeer = errors.New("peer is stalling")
+ errBannedHead = errors.New("peer head hash already banned")
errNoPeers = errors.New("no peers to keep download active")
ErrPendingQueue = errors.New("pending items in queue")
ErrTimeout = errors.New("timeout")
- errEmptyHashSet = errors.New("empty hash set by peer")
+ ErrEmptyHashSet = errors.New("empty hash set by peer")
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool")
ErrInvalidChain = errors.New("retrieved hash chain is invalid")
@@ -74,11 +73,10 @@ type crossCheck struct {
type Downloader struct {
mux *event.TypeMux
- mu sync.RWMutex
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
- banned *set.SetNonTS // Set of hashes we've received and banned
+ banned *set.Set // Set of hashes we've received and banned
// Callbacks
hasBlock hashCheckFn
@@ -116,7 +114,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa
blockCh: make(chan blockPack, 1),
}
// Inject all the known bad hashes
- downloader.banned = set.NewNonTS()
+ downloader.banned = set.New()
for hash, _ := range core.BadHashes {
downloader.banned.Add(hash)
}
@@ -135,6 +133,12 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
+ // If the peer wants to send a banned hash, reject
+ if d.banned.Has(head) {
+ glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id)
+ return errBannedHead
+ }
+ // Otherwise try to construct and register the peer
glog.V(logger.Detail).Infoln("Registering peer", id)
if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err)
@@ -164,6 +168,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
}
defer atomic.StoreInt32(&d.synchronising, 0)
+ // If the head hash is banned, terminate immediately
+ if d.banned.Has(hash) {
+ return ErrInvalidChain
+ }
// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
@@ -197,6 +205,8 @@ func (d *Downloader) TakeBlocks() []*Block {
return d.queue.TakeBlocks()
}
+// Has checks if the downloader knows about a particular hash, meaning that its
+// either already downloaded of pending retrieval.
func (d *Downloader) Has(hash common.Hash) bool {
return d.queue.Has(hash)
}
@@ -253,23 +263,29 @@ func (d *Downloader) Cancel() bool {
// XXX Make synchronous
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
- glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
-
- start := time.Now()
-
- // Add the hash to the queue first, and start hash retrieval
- d.queue.Insert([]common.Hash{h})
- p.getHashes(h)
-
var (
+ start = time.Now()
active = p // active peer will help determine the current active peer
head = common.Hash{} // common and last hash
- timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
+ timeout = time.NewTimer(0) // timer to dump a non-responsive active peer
attempted = make(map[string]bool) // attempted peers will help with retries
crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
)
defer crossTicker.Stop()
+ defer timeout.Stop()
+
+ glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
+ <-timeout.C // timeout channel should be initially empty.
+
+ getHashes := func(from common.Hash) {
+ active.getHashes(from)
+ timeout.Reset(hashTTL)
+ }
+
+ // Add the hash to the queue, and start hash retrieval.
+ d.queue.Insert([]common.Hash{h})
+ getHashes(h)
attempted[p.id] = true
for finished := false; !finished; {
@@ -280,19 +296,24 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
case hashPack := <-d.hashCh:
// Make sure the active peer is giving us the hashes
if hashPack.peerId != active.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
break
}
- timeout.Reset(hashTTL)
+ timeout.Stop()
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
- glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
- return errEmptyHashSet
+ glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
+ return ErrEmptyHashSet
}
- for _, hash := range hashPack.hashes {
+ for index, hash := range hashPack.hashes {
if d.banned.Has(hash) {
- glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id)
+ glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id)
+
+ d.queue.Insert(hashPack.hashes[:index+1])
+ if err := d.banBlocks(active.id, hash); err != nil {
+ glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
+ }
return ErrInvalidChain
}
}
@@ -300,7 +321,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
done, index := false, 0
for index, head = range hashPack.hashes {
if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
- glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
+ glog.V(logger.Debug).Infof("Found common hash %x", head[:4])
hashPack.hashes = hashPack.hashes[:index]
done = true
break
@@ -309,7 +330,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// Insert all the new hashes, but only continue if got something useful
inserts := d.queue.Insert(hashPack.hashes)
if len(inserts) == 0 && !done {
- glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
+ glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
return ErrBadPeer
}
if !done {
@@ -324,21 +345,21 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
d.checks[origin] = &crossCheck{
- expire: time.Now().Add(blockTTL),
+ expire: time.Now().Add(blockSoftTTL),
parent: parent,
}
active.getBlocks([]common.Hash{origin})
// Also fetch a fresh
- active.getHashes(head)
+ getHashes(head)
continue
}
- // We're done, allocate the download cache and proceed pulling the blocks
+ // We're done, prepare the download cache and proceed pulling the blocks
offset := 0
if block := d.getBlock(head); block != nil {
offset = int(block.NumberU64() + 1)
}
- d.queue.Alloc(offset)
+ d.queue.Prepare(offset)
finished = true
case blockPack := <-d.blockCh:
@@ -364,7 +385,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
}
case <-timeout.C:
- glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
+ glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id)
var p *peer // p will be set if a peer can be found
// Attempt to find a new peer by checking inclusion of peers best hash in our
@@ -384,11 +405,11 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
active = p
- p.getHashes(head)
- glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
+ getHashes(head)
+ glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
}
}
- glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
+ glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start))
return nil
}
@@ -400,71 +421,92 @@ func (d *Downloader) fetchBlocks() error {
glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
start := time.Now()
- // default ticker for re-fetching blocks every now and then
+ // Start a ticker to continue throttled downloads and check for bad peers
ticker := time.NewTicker(20 * time.Millisecond)
+ defer ticker.Stop()
+
out:
for {
select {
case <-d.cancelCh:
return errCancelBlockFetch
+ case <-d.hashCh:
+ // Out of bounds hashes received, ignore them
+
case blockPack := <-d.blockCh:
// Short circuit if it's a stale cross check
if len(blockPack.blocks) == 1 {
block := blockPack.blocks[0]
if _, ok := d.checks[block.Hash()]; ok {
delete(d.checks, block.Hash())
- continue
+ break
}
}
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
- // Deliver the received chunk of blocks
- if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
- if err == ErrInvalidChain {
- // The hash chain is invalid (blocks are not ordered properly), abort
- return err
+ // Deliver the received chunk of blocks, and demote in case of errors
+ err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
+ switch err {
+ case nil:
+ // If no blocks were delivered, demote the peer (need the delivery above)
+ if len(blockPack.blocks) == 0 {
+ peer.Demote()
+ peer.SetIdle()
+ glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
+ break
}
- // Peer did deliver, but some blocks were off, penalize
- glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
+ // All was successful, promote the peer
+ peer.Promote()
+ peer.SetIdle()
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+
+ case ErrInvalidChain:
+ // The hash chain is invalid (blocks are not ordered properly), abort
+ return err
+
+ case errNoFetchesPending:
+ // Peer probably timed out with its delivery but came through
+ // in the end, demote, but allow to to pull from this peer.
peer.Demote()
- break
- }
- if glog.V(logger.Debug) && len(blockPack.blocks) > 0 {
- glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId)
+ peer.SetIdle()
+ glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
+
+ case errStaleDelivery:
+ // Delivered something completely else than requested, usually
+ // caused by a timeout and delivery during a new sync cycle.
+ // Don't set it to idle as the original request should still be
+ // in flight.
+ peer.Demote()
+ glog.V(logger.Detail).Infof("%s: stale delivery", peer)
+
+ default:
+ // Peer did something semi-useful, demote but keep it around
+ peer.Demote()
+ peer.SetIdle()
+ glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
}
- // Promote the peer and update it's idle state
- peer.Promote()
- peer.SetIdle()
}
+
case <-ticker.C:
- // Check for bad peers. Bad peers may indicate a peer not responding
- // to a `getBlocks` message. A timeout of 5 seconds is set. Peers
- // that badly or poorly behave are removed from the peer set (not banned).
- // Bad peers are excluded from the available peer set and therefor won't be
- // reused. XXX We could re-introduce peers after X time.
- badPeers := d.queue.Expire(blockTTL)
+ // Short circuit if we lost all our peers
+ if d.peers.Len() == 0 {
+ return errNoPeers
+ }
+ // Check for block request timeouts and demote the responsible peers
+ badPeers := d.queue.Expire(blockHardTTL)
for _, pid := range badPeers {
- // XXX We could make use of a reputation system here ranking peers
- // in their performance
- // 1) Time for them to respond;
- // 2) Measure their speed;
- // 3) Amount and availability.
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
+ glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
}
}
- // After removing bad peers make sure we actually have sufficient peer left to keep downloading
- if d.peers.Len() == 0 {
- return errNoPeers
- }
- // If there are unrequested hashes left start fetching
- // from the available peers.
+ // If there are unrequested hashes left start fetching from the available peers
if d.queue.Pending() > 0 {
// Throttle the download if block cache is full and waiting processing
if d.queue.Throttle() {
- continue
+ break
}
// Send a download request to all idle peers, until throttled
idlePeers := d.peers.IdlePeers()
@@ -475,15 +517,18 @@ out:
}
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
- request := d.queue.Reserve(peer, MaxBlockFetch)
+ request := d.queue.Reserve(peer, peer.Capacity())
if request == nil {
continue
}
+ if glog.V(logger.Detail) {
+ glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
+ }
// Fetch the chunk and check for error. If the peer was somehow
// already fetching a chunk due to a bug, it will be returned to
// the queue
if err := peer.Fetch(request); err != nil {
- glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id)
+ glog.V(logger.Error).Infof("Peer %s received double work", peer.id)
d.queue.Cancel(request)
}
}
@@ -502,10 +547,95 @@ out:
}
}
glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
-
return nil
}
+// banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes,
+// and bans the head of the retrieved batch.
+//
+// This method only fetches one single batch as the goal is not ban an entire
+// (potentially long) invalid chain - wasting a lot of time in the meanwhile -,
+// but rather to gradually build up a blacklist if the peer keeps reconnecting.
+func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
+ glog.V(logger.Debug).Infof("Banning a batch out of %d blocks from %s", d.queue.Pending(), peerId)
+
+ // Ask the peer being banned for a batch of blocks from the banning point
+ peer := d.peers.Peer(peerId)
+ if peer == nil {
+ return nil
+ }
+ request := d.queue.Reserve(peer, MaxBlockFetch)
+ if request == nil {
+ return nil
+ }
+ if err := peer.Fetch(request); err != nil {
+ return err
+ }
+ // Wait a bit for the reply to arrive, and ban if done so
+ timeout := time.After(blockHardTTL)
+ for {
+ select {
+ case <-d.cancelCh:
+ return errCancelBlockFetch
+
+ case <-timeout:
+ return ErrTimeout
+
+ case <-d.hashCh:
+ // Out of bounds hashes received, ignore them
+
+ case blockPack := <-d.blockCh:
+ blocks := blockPack.blocks
+
+ // Short circuit if it's a stale cross check
+ if len(blocks) == 1 {
+ block := blocks[0]
+ if _, ok := d.checks[block.Hash()]; ok {
+ delete(d.checks, block.Hash())
+ break
+ }
+ }
+ // Short circuit if it's not from the peer being banned
+ if blockPack.peerId != peerId {
+ break
+ }
+ // Short circuit if no blocks were returned
+ if len(blocks) == 0 {
+ return errors.New("no blocks returned to ban")
+ }
+ // Reconstruct the original chain order and ensure we're banning the correct blocks
+ types.BlockBy(types.Number).Sort(blocks)
+ if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 {
+ return errors.New("head block not the banned one")
+ }
+ index := 0
+ for _, block := range blocks[1:] {
+ if bytes.Compare(block.ParentHash().Bytes(), blocks[index].Hash().Bytes()) != 0 {
+ break
+ }
+ index++
+ }
+ // Ban the head hash and phase out any excess
+ d.banned.Add(blocks[index].Hash())
+ for d.banned.Size() > maxBannedHashes {
+ var evacuate common.Hash
+
+ d.banned.Each(func(item interface{}) bool {
+ // Skip any hard coded bans
+ if core.BadHashes[item.(common.Hash)] {
+ return true
+ }
+ evacuate = item.(common.Hash)
+ return false
+ })
+ d.banned.Remove(evacuate)
+ }
+ glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId)
+ return nil
+ }
+ }
+}
+
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {