diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 667 |
1 files changed, 438 insertions, 229 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 64fb1b57b..7ae7aa221 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,8 +19,10 @@ package downloader import ( "errors" + "fmt" "math" "math/big" + "strings" "sync" "sync/atomic" "time" @@ -32,70 +34,96 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) -const ( - eth61 = 61 // Constant to check for old protocol support - eth62 = 62 // Constant to check for new protocol support -) - var ( - MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request - MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request - MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request - MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request - MaxStateFetch = 384 // Amount of node state values to allow fetching per request - MaxReceiptsFetch = 384 // Amount of transaction receipts to allow fetching per request - - hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired - headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out - bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth - bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - - maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) - maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) - maxBlockProcess = 256 // Number of blocks to import at once into the chain + MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request + MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request + MaxStateFetch = 384 // Amount of node state values to allow fetching per request + MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request + + hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out + blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth + blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired + headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out + bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth + bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired + receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth + receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired + + maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) + maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) + maxResultsProcess = 256 // Number of download results to import at once into the chain + + headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync + minCheckedHeaders = 1024 // Number of headers to verify fully when approaching the chain head + minFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errEmptyHeaderSet = errors.New("empty header set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errInvalidBody = errors.New("retrieved block body is invalid") - errCancelHashFetch = errors.New("hash fetching canceled (requested)") - errCancelBlockFetch = errors.New("block downloading canceled (requested)") - errCancelHeaderFetch = errors.New("block header fetching canceled (requested)") - errCancelBodyFetch = errors.New("block body downloading canceled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errEmptyHeaderSet = errors.New("empty header set by peer") + errPeersUnavailable = errors.New("no peers available or all tried for download") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errInvalidBlock = errors.New("retrieved block is invalid") + errInvalidBody = errors.New("retrieved block body is invalid") + errInvalidReceipt = errors.New("retrieved receipt is invalid") + errCancelHashFetch = errors.New("hash download canceled (requested)") + errCancelBlockFetch = errors.New("block download canceled (requested)") + errCancelHeaderFetch = errors.New("block header download canceled (requested)") + errCancelBodyFetch = errors.New("block body download canceled (requested)") + errCancelReceiptFetch = errors.New("receipt download canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) -// hashCheckFn is a callback type for verifying a hash's presence in the local chain. -type hashCheckFn func(common.Hash) bool +// headerCheckFn is a callback type for verifying a header's presence in the local chain. +type headerCheckFn func(common.Hash) bool + +// blockCheckFn is a callback type for verifying a block's presence in the local chain. +type blockCheckFn func(common.Hash) bool + +// headerRetrievalFn is a callback type for retrieving a header from the local chain. +type headerRetrievalFn func(common.Hash) *types.Header // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block -// headRetrievalFn is a callback type for retrieving the head block from the local chain. -type headRetrievalFn func() *types.Block +// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain. +type headHeaderRetrievalFn func() *types.Header + +// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. +type headBlockRetrievalFn func() *types.Block // tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. type tdRetrievalFn func(common.Hash) *big.Int -// chainInsertFn is a callback type to insert a batch of blocks into the local chain. -type chainInsertFn func(types.Blocks) (int, error) +// headerChainInsertFn is a callback type to insert a batch of headers into the local chain. +type headerChainInsertFn func([]*types.Header, bool) (int, error) + +// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain. +type blockChainInsertFn func(types.Blocks) (int, error) + +// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain. +type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error) // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +// dataPack is a data message returned by a peer for some query. +type dataPack interface { + PeerId() string + Empty() bool + Stats() string +} + // hashPack is a batch of block hashes returned by a peer (eth/61). type hashPack struct { peerId string @@ -121,8 +149,33 @@ type bodyPack struct { uncles [][]*types.Header } +// PeerId retrieves the origin peer who sent this block body packet. +func (p *bodyPack) PeerId() string { return p.peerId } + +// Empty returns whether the no block bodies were delivered. +func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 } + +// Stats creates a textual stats report for logging purposes. +func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) } + +// receiptPack is a batch of receipts returned by a peer. +type receiptPack struct { + peerId string + receipts [][]*types.Receipt +} + +// PeerId retrieves the origin peer who sent this receipt packet. +func (p *receiptPack) PeerId() string { return p.peerId } + +// Empty returns whether the no receipts were delivered. +func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 } + +// Stats creates a textual stats report for logging purposes. +func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) } + type Downloader struct { - mux *event.TypeMux + mode SyncMode // Synchronisation mode defining the strategies used + mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed @@ -135,12 +188,17 @@ type Downloader struct { syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks - hasBlock hashCheckFn // Checks if a block is present in the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headBlock headRetrievalFn // Retrieves the head block from the chain - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertChain chainInsertFn // Injects a batch of blocks into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + hasHeader headerCheckFn // Checks if a header is present in the chain + hasBlock blockCheckFn // Checks if a block is present in the chain + getHeader headerRetrievalFn // Retrieves a header from the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + headHeader headHeaderRetrievalFn // Retrieves the head header from the chain + headBlock headBlockRetrievalFn // Retrieves the head block from the chain + getTd tdRetrievalFn // Retrieves the TD of a block from the chain + insertHeaders headerChainInsertFn // Injects a batch of headers into the chain + insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain + insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -149,46 +207,56 @@ type Downloader struct { notified int32 // Channels - newPeerCh chan *peer - hashCh chan hashPack // [eth/61] Channel receiving inbound hashes - blockCh chan blockPack // [eth/61] Channel receiving inbound blocks - headerCh chan headerPack // [eth/62] Channel receiving inbound block headers - bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies - wakeCh chan bool // Channel to signal the block/body fetcher of new tasks + newPeerCh chan *peer + hashCh chan hashPack // [eth/61] Channel receiving inbound hashes + blockCh chan blockPack // [eth/61] Channel receiving inbound blocks + headerCh chan headerPack // [eth/62] Channel receiving inbound block headers + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers // Testing hooks - syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run - bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch - chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) -} - -// Block is an origin-tagged blockchain block. -type Block struct { - RawBlock *types.Block - OriginPeer string + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run + bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch + receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch + chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, getTd tdRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, + headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, + insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { + return &Downloader{ - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasBlock: hasBlock, - getBlock: getBlock, - headBlock: headBlock, - getTd: getTd, - insertChain: insertChain, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), - headerCh: make(chan headerPack, 1), - bodyCh: make(chan bodyPack, 1), - wakeCh: make(chan bool, 1), + mode: mode, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + hasHeader: hasHeader, + hasBlock: hasBlock, + getHeader: getHeader, + getBlock: getBlock, + headHeader: headHeader, + headBlock: headBlock, + getTd: getTd, + insertHeaders: insertHeaders, + insertBlocks: insertBlocks, + insertReceipts: insertReceipts, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan hashPack, 1), + blockCh: make(chan blockPack, 1), + headerCh: make(chan headerPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + blockWakeCh: make(chan bool, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), } } @@ -211,10 +279,10 @@ func (d *Downloader) Synchronising() bool { // used for fetching hashes and blocks from. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) error { + getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error { glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -222,13 +290,15 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, } // UnregisterPeer remove a peer from the known list, preventing any action from -// the specified peer. +// the specified peer. An effort is also made to return any pending fetches into +// the queue. func (d *Downloader) UnregisterPeer(id string) error { glog.V(logger.Detail).Infoln("Unregistering peer", id) if err := d.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Unregister failed:", err) return err } + d.queue.Revoke(id) return nil } @@ -275,16 +345,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error glog.V(logger.Info).Infoln("Block synchronisation started") } // Abort if the queue still contains some leftover data - if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { + if d.queue.GetHeadResult() != nil { return errPendingQueue } - // Reset the queue and peer set to clean any internal leftover state + // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() - select { - case <-d.wakeCh: - default: + for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} { + select { + case <-ch: + default: + } } // Create cancel channel for aborting mid-flight d.cancelLock.Lock() @@ -299,12 +371,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error return d.syncWithPeer(p, hash, td) } +/* // Has checks if the downloader knows about a particular hash, meaning that its // either already downloaded of pending retrieval. func (d *Downloader) Has(hash common.Hash) bool { return d.queue.Has(hash) } - +*/ // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) { @@ -323,7 +396,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer glog.V(logger.Debug).Infof("Synchronisation terminated") switch { - case p.version == eth61: + case p.version == 61: // Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight61(p) if err != nil { @@ -344,6 +417,8 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e if d.syncInitHook != nil { d.syncInitHook(origin, latest) } + d.queue.Prepare(origin+1, 1) + errc := make(chan error, 2) go func() { errc <- d.fetchHashes61(p, td, origin+1) }() go func() { errc <- d.fetchBlocks61(origin + 1) }() @@ -356,7 +431,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e } return <-errc - case p.version >= eth62: + case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight(p) if err != nil { @@ -373,21 +448,32 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent header and content retrieval algorithm + parts := 1 + if d.mode == FastSync { + parts = 2 // receipts are fetched too + } + d.queue.Prepare(origin+1, parts) + if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 2) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() - go func() { errc <- d.fetchBodies(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err + errc := make(chan error, 3) + go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved + go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync + go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync + + // If any fetcher fails, cancel the others + var fail error + for i := 0; i < cap(errc); i++ { + if err := <-errc; err != nil { + if fail == nil { + fail = err + d.cancel() + } + } } - return <-errc + return fail default: // Something very wrong, stop right here @@ -637,7 +723,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: no available hashes", p) select { - case d.wakeCh <- false: + case d.blockWakeCh <- false: case <-d.cancelCh: } // If no hashes were retrieved at all, the peer violated it's TD promise that it had a @@ -660,24 +746,24 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { gotHashes = true // Otherwise insert all the new hashes, aborting in case of junk - glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from) + glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from) - inserts := d.queue.Insert61(hashPack.hashes, true) + inserts := d.queue.Schedule61(hashPack.hashes, true) if len(inserts) != len(hashPack.hashes) { glog.V(logger.Debug).Infof("%v: stale hashes", p) return errBadPeer } // Notify the block fetcher of new hashes, but stop if queue is full - if d.queue.Pending() < maxQueuedHashes { + if d.queue.PendingBlocks() < maxQueuedHashes { // We still have hashes to fetch, send continuation wake signal (potential) select { - case d.wakeCh <- true: + case d.blockWakeCh <- true: default: } } else { // Hash limit reached, send a termination wake signal (enforced) select { - case d.wakeCh <- false: + case d.blockWakeCh <- false: case <-d.cancelCh: } return nil @@ -707,10 +793,8 @@ func (d *Downloader) fetchBlocks61(from uint64) error { update := make(chan struct{}, 1) - // Prepare the queue and fetch blocks until the hash fetcher's done - d.queue.Prepare(from) + // Fetch blocks until the hash fetcher's done finished := false - for { select { case <-d.cancelCh: @@ -733,13 +817,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // If no blocks were delivered, demote the peer (need the delivery above) if len(blockPack.blocks) == 0 { peer.Demote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } // All was successful, promote the peer and potentially start processing peer.Promote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) go d.process() @@ -751,7 +835,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Peer probably timed out with its delivery but came through // in the end, demote, but allow to to pull from this peer. peer.Demote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) case errStaleDelivery: @@ -765,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { default: // Peer did something semi-useful, demote but keep it around peer.Demote() - peer.SetIdle61() + peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) go d.process() } @@ -776,7 +860,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { default: } - case cont := <-d.wakeCh: + case cont := <-d.blockWakeCh: // The hash fetcher sent a continuation flag, check if it's done if !cont { finished = true @@ -800,14 +884,14 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire(blockHardTTL) { + for _, pid := range d.queue.Expire61(blockHardTTL) { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) } } - // If there's noting more to fetch, wait or terminate - if d.queue.Pending() == 0 { + // If there's nothing more to fetch, wait or terminate + if d.queue.PendingBlocks() == 0 { if d.queue.InFlight() == 0 && finished { glog.V(logger.Debug).Infof("Block fetching completed") return nil @@ -816,16 +900,18 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Send a download request to all idle peers, until throttled throttled := false - for _, peer := range d.peers.IdlePeers(eth61) { + idles, total := d.peers.BlockIdlePeers(61) + + for _, peer := range idles { // Short circuit if throttling activated - if d.queue.Throttle() { + if d.queue.ThrottleBlocks() { throttled = true break } // Reserve a chunk of hashes for a peer. A nil can mean either that // no more hashes are available, or that the peer is known not to // have them. - request := d.queue.Reserve61(peer, peer.Capacity()) + request := d.queue.Reserve61(peer, peer.BlockCapacity()) if request == nil { continue } @@ -835,12 +921,12 @@ func (d *Downloader) fetchBlocks61(from uint64) error { // Fetch the chunk and make sure any errors return the hashes to the queue if err := peer.Fetch61(request); err != nil { glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel(request) + d.queue.Cancel61(request) } } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !throttled && d.queue.InFlight() == 0 { + if !throttled && d.queue.InFlight() == 0 && len(idles) == total { return errPeersUnavailable } } @@ -891,16 +977,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { } } -// findAncestor tries to locate the common ancestor block of the local chain and +// findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and -// on the correct chain, checking the top N blocks should already get us a match. +// on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganization (i.e. none of -// the head blocks match), we do a binary search to find the common ancestor. +// the head links match), we do a binary search to find the common ancestor. func (d *Downloader) findAncestor(p *peer) (uint64, error) { glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) - // Request our head blocks to short circuit ancestor location - head := d.headBlock().NumberU64() + // Request our head headers to short circuit ancestor location + head := d.headHeader().Number.Uint64() + if d.mode == FullSync { + head = d.headBlock().NumberU64() + } from := int64(head) - int64(MaxHeaderFetch) + 1 if from < 0 { from = 0 @@ -931,7 +1020,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { // Check if a common ancestor was found finished = true for i := len(headers) - 1; i >= 0; i-- { - if d.hasBlock(headers[i].Hash()) { + if (d.mode == FullSync && d.hasBlock(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -986,13 +1075,13 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { arrived = true // Modify the search interval based on the response - block := d.getBlock(headers[0].Hash()) - if block == nil { + if (d.mode == FullSync && !d.hasBlock(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) { end = check break } - if block.NumberU64() != check { - glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists + if header.Number.Uint64() != check { + glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check) return 0, errBadPeer } start = check @@ -1017,6 +1106,9 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { // fetchHeaders keeps retrieving headers from the requested number, until no more // are returned, potentially throttling on the way. +// +// The queue parameter can be used to switch between queuing headers for block +// body download too, or directly import as pure header chains. func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from) defer glog.V(logger.Debug).Infof("%v: header download terminated", p) @@ -1058,13 +1150,15 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { headerReqTimer.UpdateSince(request) timeout.Stop() - // If no more headers are inbound, notify the body fetcher and return + // If no more headers are inbound, notify the content fetchers and return if len(headerPack.headers) == 0 { glog.V(logger.Debug).Infof("%v: no available headers", p) - select { - case d.wakeCh <- false: - case <-d.cancelCh: + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } } // If no headers were retrieved at all, the peer violated it's TD promise that it had a // better chain compared to ours. The only exception is if it's promised blocks were @@ -1086,27 +1180,37 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { gotHeaders = true // Otherwise insert all the new headers, aborting in case of junk - glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from) + glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from) - inserts := d.queue.Insert(headerPack.headers, from) - if len(inserts) != len(headerPack.headers) { - glog.V(logger.Debug).Infof("%v: stale headers", p) - return errBadPeer - } - // Notify the block fetcher of new headers, but stop if queue is full - if d.queue.Pending() < maxQueuedHeaders { - // We still have headers to fetch, send continuation wake signal (potential) - select { - case d.wakeCh <- true: - default: + if d.mode == FullSync || d.mode == FastSync { + inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync) + if len(inserts) != len(headerPack.headers) { + glog.V(logger.Debug).Infof("%v: stale headers", p) + return errBadPeer } } else { - // Header limit reached, send a termination wake signal (enforced) - select { - case d.wakeCh <- false: - case <-d.cancelCh: + if n, err := d.insertHeaders(headerPack.headers, true); err != nil { + glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) + return errInvalidChain + } + } + // Notify the content fetchers of new headers, but stop if queue is full + cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + if cont { + // We still have headers to fetch, send continuation wake signal (potential) + select { + case ch <- true: + default: + } + } else { + // Header limit reached, send a termination wake signal (enforced) + select { + case ch <- false: + case <-d.cancelCh: + } + return nil } - return nil } // Queue not yet full, fetch the next batch from += uint64(len(headerPack.headers)) @@ -1119,9 +1223,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { d.dropPeer(p.id) // Finish the sync gracefully instead of dumping the gathered data though - select { - case d.wakeCh <- false: - case <-d.cancelCh: + for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { + select { + case ch <- false: + case <-d.cancelCh: + } } return nil } @@ -1133,22 +1239,69 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // and also periodically checking for timeouts. func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) - defer glog.V(logger.Debug).Infof("Block body download terminated") - // Create a timeout timer for scheduling expiration tasks + var ( + deliver = func(packet interface{}) error { + pack := packet.(*bodyPack) + return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles) + } + expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } + capacity = func(p *peer) int { return p.BlockCapacity() } + getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) } + setIdle = func(p *peer) { p.SetBlocksIdle() } + ) + err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, + d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook, + fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body") + + glog.V(logger.Debug).Infof("Block body download terminated: %v", err) + return err +} + +// fetchReceipts iteratively downloads the scheduled block receipts, taking any +// available peers, reserving a chunk of receipts for each, waiting for delivery +// and also periodically checking for timeouts. +func (d *Downloader) fetchReceipts(from uint64) error { + glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) + + var ( + deliver = func(packet interface{}) error { + pack := packet.(*receiptPack) + return d.queue.DeliverReceipts(pack.peerId, pack.receipts) + } + expire = func() []string { return d.queue.ExpireReceipts(bodyHardTTL) } + fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } + capacity = func(p *peer) int { return p.ReceiptCapacity() } + setIdle = func(p *peer) { p.SetReceiptsIdle() } + ) + err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, + d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, + fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") + + glog.V(logger.Debug).Infof("Receipt download terminated: %v", err) + return err +} + +// fetchParts iteratively downloads scheduled block parts, taking any available +// peers, reserving a chunk of fetch requests for each, waiting for delivery and +// also periodically checking for timeouts. +func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool, + expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), + fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + + // Create a ticker to detect expired retreival tasks ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() update := make(chan struct{}, 1) - // Prepare the queue and fetch block bodies until the block header fetcher's done - d.queue.Prepare(from) + // Prepare the queue and fetch block parts until the block header fetcher's done finished := false - for { select { case <-d.cancelCh: - return errCancelBlockFetch + return errCancel case <-d.hashCh: // Out of bounds eth/61 hashes received, ignore them @@ -1156,42 +1309,41 @@ func (d *Downloader) fetchBodies(from uint64) error { case <-d.blockCh: // Out of bounds eth/61 blocks received, ignore them - case bodyPack := <-d.bodyCh: + case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if peer := d.peers.Peer(bodyPack.peerId); peer != nil { - // Deliver the received chunk of bodies, and demote in case of errors - err := d.queue.Deliver(bodyPack.peerId, bodyPack.transactions, bodyPack.uncles) - switch err { + if peer := d.peers.Peer(packet.PeerId()); peer != nil { + // Deliver the received chunk of data, and demote in case of errors + switch err := deliver(packet); err { case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(bodyPack.transactions) == 0 || len(bodyPack.uncles) == 0 { + // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) + if packet.Empty() { peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: no block bodies delivered", peer) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) break } // All was successful, promote the peer and potentially start processing peer.Promote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivered %d:%d block bodies", peer, len(bodyPack.transactions), len(bodyPack.uncles)) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort return err - case errInvalidBody: + case errInvalidBody, errInvalidReceipt: // The peer delivered something very bad, drop immediately - glog.V(logger.Error).Infof("%s: delivered invalid block, dropping", peer) + glog.V(logger.Error).Infof("%s: delivered invalid %s, dropping", peer, strings.ToLower(kind)) d.dropPeer(peer.id) case errNoFetchesPending: // Peer probably timed out with its delivery but came through // in the end, demote, but allow to to pull from this peer. peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) case errStaleDelivery: // Delivered something completely else than requested, usually @@ -1199,13 +1351,13 @@ func (d *Downloader) fetchBodies(from uint64) error { // Don't set it to idle as the original request should still be // in flight. peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) + glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind)) default: // Peer did something semi-useful, demote but keep it around peer.Demote() - peer.SetIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + setIdle(peer) + glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) go d.process() } } @@ -1215,7 +1367,7 @@ func (d *Downloader) fetchBodies(from uint64) error { default: } - case cont := <-d.wakeCh: + case cont := <-wakeCh: // The header fetcher sent a continuation flag, check if it's done if !cont { finished = true @@ -1238,65 +1390,69 @@ func (d *Downloader) fetchBodies(from uint64) error { if d.peers.Len() == 0 { return errNoPeers } - // Check for block body request timeouts and demote the responsible peers - for _, pid := range d.queue.Expire(bodyHardTTL) { + // Check for fetch request timeouts and demote the responsible peers + for _, pid := range expire() { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() - glog.V(logger.Detail).Infof("%s: block body delivery timeout", peer) + glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) } } - // If there's noting more to fetch, wait or terminate - if d.queue.Pending() == 0 { + // If there's nothing more to fetch, wait or terminate + if pending() == 0 { if d.queue.InFlight() == 0 && finished { - glog.V(logger.Debug).Infof("Block body fetching completed") + glog.V(logger.Debug).Infof("%s fetching completed", kind) return nil } break } // Send a download request to all idle peers, until throttled - queuedEmptyBlocks, throttled := false, false - for _, peer := range d.peers.IdlePeers(eth62) { + progressed, throttled := false, false + idles, total := idle() + + for _, peer := range idles { // Short circuit if throttling activated - if d.queue.Throttle() { + if throttle() { throttled = true break } - // Reserve a chunk of hashes for a peer. A nil can mean either that - // no more hashes are available, or that the peer is known not to + // Reserve a chunk of fetches for a peer. A nil can mean either that + // no more headers are available, or that the peer is known not to // have them. - request, process, err := d.queue.Reserve(peer, peer.Capacity()) + request, progress, err := reserve(peer, capacity(peer)) if err != nil { return err } - if process { - queuedEmptyBlocks = true + if progress { + progressed = true go d.process() } if request == nil { continue } if glog.V(logger.Detail) { - glog.Infof("%s: requesting %d block bodies", peer, len(request.Headers)) + glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) } // Fetch the chunk and make sure any errors return the hashes to the queue - if d.bodyFetchHook != nil { - d.bodyFetchHook(request.Headers) + if fetchHook != nil { + fetchHook(request.Headers) } - if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) - d.queue.Cancel(request) + if err := fetch(peer, request); err != nil { + glog.V(logger.Error).Infof("%v: %s fetch failed, rescheduling", peer, strings.ToLower(kind)) + cancel(request) } } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !queuedEmptyBlocks && !throttled && d.queue.InFlight() == 0 { + if !progressed && !throttled && d.queue.InFlight() == 0 && len(idles) == total { return errPeersUnavailable } } } } -// process takes blocks from the queue and tries to import them into the chain. +// process takes fetch results from the queue and tries to import them into the +// chain. The type of import operation will depend on the result contents: +// - // // The algorithmic flow is as follows: // - The `processing` flag is swapped to 1 to ensure singleton access @@ -1317,10 +1473,10 @@ func (d *Downloader) process() { } // If the processor just exited, but there are freshly pending items, try to // reenter. This is needed because the goroutine spinned up for processing - // the fresh blocks might have been rejected entry to to this present thread + // the fresh results might have been rejected entry to to this present thread // not yet releasing the `processing` state. defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil { + if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { d.process() } }() @@ -1328,38 +1484,64 @@ func (d *Downloader) process() { // the import statistics to zero. defer atomic.StoreInt32(&d.processing, 0) - // Repeat the processing as long as there are blocks to import + // Repeat the processing as long as there are results to process for { - // Fetch the next batch of blocks - blocks := d.queue.TakeBlocks() - if len(blocks) == 0 { + // Fetch the next batch of results + results := d.queue.TakeResults() + if len(results) == 0 { return } if d.chainInsertHook != nil { - d.chainInsertHook(blocks) + d.chainInsertHook(results) } // Actually import the blocks - glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) - for len(blocks) != 0 { + if glog.V(logger.Debug) { + first, last := results[0].Header, results[len(results)-1].Header + glog.V(logger.Debug).Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4]) + } + for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { return } - // Retrieve the first batch of blocks to insert - max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) - raw := make(types.Blocks, 0, max) - for _, block := range blocks[:max] { - raw = append(raw, block.RawBlock) + // Retrieve the a batch of results to import + var ( + headers = make([]*types.Header, 0, maxResultsProcess) + blocks = make([]*types.Block, 0, maxResultsProcess) + receipts = make([]types.Receipts, 0, maxResultsProcess) + ) + items := int(math.Min(float64(len(results)), float64(maxResultsProcess))) + for _, result := range results[:items] { + switch { + case d.mode == FullSync: + blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) + case d.mode == FastSync: + blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)) + receipts = append(receipts, result.Receipts) + case d.mode == LightSync: + headers = append(headers, result.Header) + } + } + // Try to process the results, aborting if there's an error + var ( + err error + index int + ) + switch { + case d.mode == FullSync: + index, err = d.insertBlocks(blocks) + case d.mode == FastSync: + index, err = d.insertReceipts(blocks, receipts) + case d.mode == LightSync: + index, err = d.insertHeaders(headers, true) } - // Try to inset the blocks, drop the originating peer if there's an error - index, err := d.insertChain(raw) if err != nil { - glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) - d.dropPeer(blocks[index].OriginPeer) + glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash(), err) d.cancel() return } - blocks = blocks[max:] + // Shift the results to the next batch + results = results[items:] } } } @@ -1468,7 +1650,34 @@ func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transactio d.cancelLock.RUnlock() select { - case d.bodyCh <- bodyPack{id, transactions, uncles}: + case d.bodyCh <- &bodyPack{id, transactions, uncles}: + return nil + + case <-cancel: + return errNoSyncActive + } +} + +// DeliverReceipts injects a new batch of receipts received from a remote node. +func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { + // Update the delivery metrics for both good and failed deliveries + receiptInMeter.Mark(int64(len(receipts))) + defer func() { + if err != nil { + receiptDropMeter.Mark(int64(len(receipts))) + } + }() + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // Deliver or abort if the sync is canceled while queuing + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + select { + case d.receiptCh <- &receiptPack{id, receipts}: return nil case <-cancel: |