diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-06-05 01:42:34 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-06-05 01:42:34 +0800 |
commit | 10fc73376789b1b016fbbd86df3b378df0238a0c (patch) | |
tree | aff4fb03c3fdb054bef7ad132e2ce32ff8668698 | |
parent | 861031491860c69f29e1444c22dd84c38abfedf4 (diff) | |
parent | d754c25cc87172dfafeea71116da2260544a3f09 (diff) | |
download | go-tangerine-10fc73376789b1b016fbbd86df3b378df0238a0c.tar.gz go-tangerine-10fc73376789b1b016fbbd86df3b378df0238a0c.tar.zst go-tangerine-10fc73376789b1b016fbbd86df3b378df0238a0c.zip |
Merge pull request #1184 from karalabe/nonstop-block-fetches
eth/downloader: fix #1178, don't request blocks beyond the cache bounds
-rw-r--r-- | eth/downloader/downloader.go | 36 | ||||
-rw-r--r-- | eth/downloader/peer.go | 15 | ||||
-rw-r--r-- | eth/downloader/queue.go | 2 |
3 files changed, 33 insertions, 20 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f52a97610..d8dbef726 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -281,19 +281,19 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != active.id { - glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId) + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) break } timeout.Reset(hashTTL) // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { - glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) return errEmptyHashSet } for _, hash := range hashPack.hashes { if d.banned.Has(hash) { - glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) return ErrInvalidChain } } @@ -301,7 +301,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { done, index := false, 0 for index, head = range hashPack.hashes { if d.hasBlock(head) || d.queue.GetBlock(head) != nil { - glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4]) + glog.V(logger.Debug).Infof("Found common hash %x", head[:4]) hashPack.hashes = hashPack.hashes[:index] done = true break @@ -310,7 +310,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Insert all the new hashes, but only continue if got something useful inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { - glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id) + glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return ErrBadPeer } if !done { @@ -365,7 +365,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } case <-timeout.C: - glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id) + glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id) var p *peer // p will be set if a peer can be found // Attempt to find a new peer by checking inclusion of peers best hash in our @@ -386,10 +386,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // by our previous (delayed) peer. active = p p.getHashes(head) - glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id) + glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) } } - glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start)) + glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start)) return nil } @@ -421,24 +421,29 @@ out: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(blockPack.peerId); peer != nil { - // Deliver the received chunk of blocks + // Deliver the received chunk of blocks, and demote in case of errors if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { if err == ErrInvalidChain { // The hash chain is invalid (blocks are not ordered properly), abort return err } // Peer did deliver, but some blocks were off, penalize - glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) peer.Demote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) break } - if glog.V(logger.Debug) && len(blockPack.blocks) > 0 { - glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + // If no blocks were delivered, demote the peer (above code is needed to mark the packet done!) + if len(blockPack.blocks) == 0 { + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + break } - // Promote the peer and update it's idle state + // All was successful, promote the peer peer.Promote() peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding @@ -481,11 +486,14 @@ out: if request == nil { continue } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue if err := peer.Fetch(request); err != nil { - glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) + glog.V(logger.Error).Infof("Peer %s received double work", peer.id) d.queue.Cancel(request) } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 8ef017df7..43b50079b 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -5,14 +5,13 @@ package downloader import ( "errors" + "fmt" "math" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/fatih/set.v0" ) @@ -100,9 +99,6 @@ func (p *peer) SetIdle() { if next == 1 { p.Demote() } - if prev != next { - glog.V(logger.Detail).Infof("%s: changing block download capacity from %d to %d", p.id, prev, next) - } break } } @@ -135,6 +131,15 @@ func (p *peer) Demote() { } } +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ + fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ + fmt.Sprintf("ignored %4d", p.ignored.Size()), + ) +} + // peerSet represents the collection of active peer participating in the block // download procedure. type peerSet struct { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 69d91512a..02fa667f1 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -225,7 +225,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest { skip := make(map[common.Hash]int) capacity := p.Capacity() - for len(send) < space && len(send) < capacity && !q.hashQueue.Empty() { + for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ { hash, priority := q.hashQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) |