diff options
Diffstat (limited to 'eth/sync.go')
-rw-r--r-- | eth/sync.go | 132 |
1 files changed, 126 insertions, 6 deletions
diff --git a/eth/sync.go b/eth/sync.go index 56084f2f0..dd7414da8 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -5,15 +5,135 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" ) -// update periodically tries to synchronise with the network, both downloading -// hashes and blocks as well as retrieving cached ones. -func (pm *ProtocolManager) update() { +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process + notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching + notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + blockProcAmount = 256 +) + +// blockAnnounce is the hash notification of the availability of a new block in +// the network. +type blockAnnounce struct { + hash common.Hash + peer *peer + time time.Time +} + +// fetcher is responsible for collecting hash notifications, and periodically +// checking all unknown ones and individually fetching them. +func (pm *ProtocolManager) fetcher() { + announces := make(map[common.Hash]*blockAnnounce) + request := make(map[*peer][]common.Hash) + pending := make(map[common.Hash]*blockAnnounce) + cycle := time.Tick(notifyCheckCycle) + + // Iterate the block fetching until a quit is requested + for { + select { + case notifications := <-pm.newHashCh: + // A batch of hashes the notified, schedule them for retrieval + glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) + for _, announce := range notifications { + announces[announce.hash] = announce + } + + case <-cycle: + // Clean up any expired block fetches + for hash, announce := range pending { + if time.Since(announce.time) > notifyFetchTimeout { + delete(pending, hash) + } + } + // Check if any notified blocks failed to arrive + for hash, announce := range announces { + if time.Since(announce.time) > notifyArriveTimeout { + if !pm.chainman.HasBlock(hash) { + request[announce.peer] = append(request[announce.peer], hash) + pending[hash] = announce + } + delete(announces, hash) + } + } + if len(request) == 0 { + break + } + // Send out all block requests + for peer, hashes := range request { + glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) + peer.requestBlocks(hashes) + } + request = make(map[*peer][]common.Hash) + + case filter := <-pm.newBlockCh: + // Blocks arrived, extract any explicit fetches, return all else + var blocks types.Blocks + select { + case blocks = <-filter: + case <-pm.quitSync: + return + } + + explicit, download := []*types.Block{}, []*types.Block{} + for _, block := range blocks { + hash := block.Hash() + + // Filter explicitly requested blocks from hash announcements + if _, ok := pending[hash]; ok { + // Discard if already imported by other means + if !pm.chainman.HasBlock(hash) { + explicit = append(explicit, block) + } else { + delete(pending, hash) + } + } else { + download = append(download, block) + } + } + + select { + case filter <- download: + case <-pm.quitSync: + return + } + // If any explicit fetches were replied to, import them + if count := len(explicit); count > 0 { + glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count) + go func() { + for _, block := range explicit { + hash := block.Hash() + + // Make sure there's still something pending to import + if announce := pending[hash]; announce != nil { + delete(pending, hash) + if err := pm.importBlock(announce.peer, block, nil); err != nil { + glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) + return + } + } + } + }() + } + + case <-pm.quitSync: + return + } + } +} + +// syncer is responsible for periodically synchronising with the network, both +// downloading hashes and blocks as well as retrieving cached ones. +func (pm *ProtocolManager) syncer() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) blockProcPend := int32(0) @@ -99,15 +219,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) { return } // Get the hashes from the peer (synchronously) - glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) err := pm.downloader.Synchronise(peer.id, peer.recentHash) switch err { case nil: - glog.V(logger.Debug).Infof("Synchronisation completed") + glog.V(logger.Detail).Infof("Synchronisation completed") case downloader.ErrBusy: - glog.V(logger.Debug).Infof("Synchronisation already in progress") + glog.V(logger.Detail).Infof("Synchronisation already in progress") case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) |