diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 397 |
1 files changed, 376 insertions, 21 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1b31f5dbd..b4154c166 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,6 +19,11 @@ import ( "gopkg.in/fatih/set.v0" ) +const ( + eth60 = 60 // Constant to check for old protocol support + eth61 = 61 // Constant to check for new protocol support +) + var ( MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request @@ -58,6 +63,9 @@ type hashCheckFn func(common.Hash) bool // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block +// headRetrievalFn is a callback type for retrieving the head block from the local chain. +type headRetrievalFn func() *types.Block + // chainInsertFn is a callback type to insert a batch of blocks into the local chain. type chainInsertFn func(types.Blocks) (int, error) @@ -98,6 +106,7 @@ type Downloader struct { // Callbacks hasBlock hashCheckFn // Checks if a block is present in the chain getBlock blockRetrievalFn // Retrieves a block from the chain + headBlock headRetrievalFn // Retrieves the head block from the chain insertChain chainInsertFn // Injects a batch of blocks into the chain dropPeer peerDropFn // Drops a peer for misbehaving @@ -109,8 +118,9 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack - blockCh chan blockPack + hashCh chan hashPack // Channel receiving inbound hashes + blockCh chan blockPack // Channel receiving inbound blocks + processCh chan bool // Channel to signal the block fetcher of new or finished work cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -123,7 +133,7 @@ type Block struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ mux: mux, @@ -131,11 +141,13 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, in peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, + headBlock: headBlock, insertChain: insertChain, dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), + processCh: make(chan bool, 1), } // Inject all the known bad hashes downloader.banned = set.New() @@ -175,7 +187,7 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error { // If the peer wants to send a banned hash, reject if d.banned.Has(head) { glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) @@ -183,7 +195,7 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getH } // Otherwise try to construct and register the peer glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getHashes, getBlocks)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -289,12 +301,38 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { } }() - glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) - if err = d.fetchHashes(p, hash); err != nil { - return err - } - if err = d.fetchBlocks(); err != nil { - return err + glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version) + switch p.version { + case eth60: + // Old eth/60 version, use reverse hash retrieval algorithm + if err = d.fetchHashes60(p, hash); err != nil { + return err + } + if err = d.fetchBlocks60(); err != nil { + return err + } + case eth61: + // New eth/61, use forward, concurrent hash and block retrieval algorithm + number, err := d.findAncestor(p) + if err != nil { + return err + } + errc := make(chan error, 2) + go func() { errc <- d.fetchHashes(p, number+1) }() + go func() { errc <- d.fetchBlocks(number + 1) }() + + // If any fetcher fails, cancel the other + if err := <-errc; err != nil { + d.cancel() + <-errc + return err + } + return <-errc + + default: + // Something very wrong, stop right here + glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) + return errBadPeer } glog.V(logger.Debug).Infoln("Synchronization completed") @@ -326,10 +364,10 @@ func (d *Downloader) Terminate() { d.cancel() } -// fetchHahes starts retrieving hashes backwards from a specific peer and hash, +// fetchHashes60 starts retrieving hashes backwards from a specific peer and hash, // up until it finds a common ancestor. If the source peer times out, alternative // ones are tried for continuation. -func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { +func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error { var ( start = time.Now() active = p // active peer will help determine the current active peer @@ -346,12 +384,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { <-timeout.C // timeout channel should be initially empty. getHashes := func(from common.Hash) { - go active.getHashes(from) + go active.getRelHashes(from) timeout.Reset(hashTTL) } // Add the hash to the queue, and start hash retrieval. - d.queue.Insert([]common.Hash{h}) + d.queue.Insert([]common.Hash{h}, false) getHashes(h) attempted[p.id] = true @@ -377,7 +415,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if d.banned.Has(hash) { glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) - d.queue.Insert(hashPack.hashes[:index+1]) + d.queue.Insert(hashPack.hashes[:index+1], false) if err := d.banBlocks(active.id, hash); err != nil { glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } @@ -395,7 +433,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } } // Insert all the new hashes, but only continue if got something useful - inserts := d.queue.Insert(hashPack.hashes) + inserts := d.queue.Insert(hashPack.hashes, false) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return errBadPeer @@ -422,9 +460,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } // We're done, prepare the download cache and proceed pulling the blocks - offset := 0 + offset := uint64(0) if block := d.getBlock(head); block != nil { - offset = int(block.NumberU64() + 1) + offset = block.NumberU64() + 1 } d.queue.Prepare(offset) finished = true @@ -481,10 +519,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { return nil } -// fetchBlocks iteratively downloads the entire schedules block-chain, taking +// fetchBlocks60 iteratively downloads the entire schedules block-chain, taking // any available peers, reserving a chunk of blocks for each, wait for delivery // and periodically checking for timeouts. -func (d *Downloader) fetchBlocks() error { +func (d *Downloader) fetchBlocks60() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") start := time.Now() @@ -619,6 +657,323 @@ out: return nil } +// findAncestor tries to locate the common ancestor block of the local chain and +// a remote peers blockchain. In the general case when our node was in sync and +// on the correct chain, checking the top N blocks should already get us a match. +// In the rare scenario when we ended up on a long soft fork (i.e. none of the +// head blocks match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor(p *peer) (uint64, error) { + glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) + + // Request out head blocks to short circuit ancestor location + head := d.headBlock().NumberU64() + from := int64(head) - int64(MaxHashFetch) + if from < 0 { + from = 0 + } + go p.getAbsHashes(uint64(from), MaxHashFetch) + + // Wait for the remote response to the head fetch + number, hash := uint64(0), common.Hash{} + timeout := time.After(hashTTL) + + for finished := false; !finished; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case hashPack := <-d.hashCh: + // Discard anything not from the origin peer + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + // Make sure the peer actually gave something valid + hashes := hashPack.hashes + if len(hashes) == 0 { + glog.V(logger.Debug).Infof("%v: empty head hash set", p) + return 0, errEmptyHashSet + } + // Check if a common ancestor was found + finished = true + for i := len(hashes) - 1; i >= 0; i-- { + if d.hasBlock(hashes[i]) { + number, hash = uint64(from)+uint64(i), hashes[i] + break + } + } + + case <-d.blockCh: + // Out of bounds blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: head hash timeout", p) + return 0, errTimeout + } + } + // If the head fetch already found an ancestor, return + if !common.EmptyHash(hash) { + glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4]) + return number, nil + } + // Ancestor not found, we need to binary search over our chain + start, end := uint64(0), head + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + timeout := time.After(hashTTL) + go p.getAbsHashes(uint64(check), 1) + + // Wait until a reply arrives to this request + for arrived := false; !arrived; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case hashPack := <-d.hashCh: + // Discard anything not from the origin peer + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + // Make sure the peer actually gave something valid + hashes := hashPack.hashes + if len(hashes) != 1 { + glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes)) + return 0, errBadPeer + } + arrived = true + + // Modify the search interval based on the response + block := d.getBlock(hashes[0]) + if block == nil { + end = check + break + } + if block.NumberU64() != check { + glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + return 0, errBadPeer + } + start = check + + case <-d.blockCh: + // Out of bounds blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: search hash timeout", p) + return 0, errTimeout + } + } + } + return start, nil +} + +// fetchHashes keeps retrieving hashes from the requested number, until no more +// are returned, potentially throttling on the way. +func (d *Downloader) fetchHashes(p *peer, from uint64) error { + glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from) + + // Create a timeout timer, and the associated hash fetcher + timeout := time.NewTimer(0) // timer to dump a non-responsive active peer + <-timeout.C // timeout channel should be initially empty + defer timeout.Stop() + + getHashes := func(from uint64) { + go p.getAbsHashes(from, MaxHashFetch) + timeout.Reset(hashTTL) + } + // Start pulling hashes, until all are exhausted + getHashes(from) + for { + select { + case <-d.cancelCh: + return errCancelHashFetch + + case hashPack := <-d.hashCh: + // Make sure the active peer is giving us the hashes + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + timeout.Stop() + + // If no more hashes are inbound, notify the block fetcher and return + if len(hashPack.hashes) == 0 { + glog.V(logger.Debug).Infof("%v: no available hashes", p) + + select { + case d.processCh <- false: + case <-d.cancelCh: + } + return nil + } + // Otherwise insert all the new hashes, aborting in case of junk + inserts := d.queue.Insert(hashPack.hashes, true) + if len(inserts) != len(hashPack.hashes) { + glog.V(logger.Debug).Infof("%v: stale hashes", p) + return errBadPeer + } + // Notify the block fetcher of new hashes, and continue fetching + select { + case d.processCh <- true: + default: + } + from += uint64(len(hashPack.hashes)) + getHashes(from) + + case <-timeout.C: + glog.V(logger.Debug).Infof("%v: hash request timed out", p) + return errTimeout + } + } +} + +// fetchBlocks iteratively downloads the scheduled hashes, taking any available +// peers, reserving a chunk of blocks for each, waiting for delivery and also +// periodically checking for timeouts. +func (d *Downloader) fetchBlocks(from uint64) error { + glog.V(logger.Debug).Infof("Downloading blocks from #%d", from) + defer glog.V(logger.Debug).Infof("Block download terminated") + + // Create a timeout timer for scheduling expiration tasks + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + update := make(chan struct{}, 1) + + // Prepare the queue and fetch blocks until the hash fetcher's done + d.queue.Prepare(from) + finished := false + + for { + select { + case <-d.cancelCh: + return errCancelBlockFetch + + case blockPack := <-d.blockCh: + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if peer := d.peers.Peer(blockPack.peerId); peer != nil { + // Deliver the received chunk of blocks, and demote in case of errors + err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) + switch err { + case nil: + // If no blocks were delivered, demote the peer (need the delivery above) + if len(blockPack.blocks) == 0 { + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + break + } + // All was successful, promote the peer and potentially start processing + peer.Promote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + go d.process() + + case errInvalidChain: + // The hash chain is invalid (blocks are not ordered properly), abort + return err + + case errNoFetchesPending: + // Peer probably timed out with its delivery but came through + // in the end, demote, but allow to to pull from this peer. + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + + case errStaleDelivery: + // Delivered something completely else than requested, usually + // caused by a timeout and delivery during a new sync cycle. + // Don't set it to idle as the original request should still be + // in flight. + peer.Demote() + glog.V(logger.Detail).Infof("%s: stale delivery", peer) + + default: + // Peer did something semi-useful, demote but keep it around + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + go d.process() + } + } + // Blocks arrived, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case cont := <-d.processCh: + // The hash fetcher sent a continuation flag, check if it's done + if !cont { + finished = true + } + // Hashes arrive, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case <-ticker.C: + // Sanity check update the progress + select { + case update <- struct{}{}: + default: + } + + case <-update: + // Short circuit if we lost all our peers + if d.peers.Len() == 0 { + return errNoPeers + } + // Check for block request timeouts and demote the responsible peers + for _, pid := range d.queue.Expire(blockHardTTL) { + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + } + } + // If there's noting more to fetch, wait or terminate + if d.queue.Pending() == 0 { + if d.queue.InFlight() == 0 && finished { + glog.V(logger.Debug).Infof("Block fetching completed") + return nil + } + break + } + // Send a download request to all idle peers, until throttled + for _, peer := range d.peers.IdlePeers() { + // Short circuit if throttling activated + if d.queue.Throttle() { + break + } + // Reserve a chunk of hashes for a peer. A nil can mean either that + // no more hashes are available, or that the peer is known not to + // have them. + request := d.queue.Reserve(peer, peer.Capacity()) + if request == nil { + continue + } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } + // Fetch the chunk and make sure any errors return the hashes to the queue + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) + d.queue.Cancel(request) + } + } + // Make sure that we have peers available for fetching. If all peers have been tried + // and all failed throw an error + if !d.queue.Throttle() && d.queue.InFlight() == 0 { + return errPeersUnavailable + } + } + } +} + // banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes, // and bans the head of the retrieved batch. // |