aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go58
1 files changed, 53 insertions, 5 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 4abae8d5e..9614a6951 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -5,8 +5,11 @@ package downloader
import (
"errors"
+ "fmt"
+ "math"
"sync"
"sync/atomic"
+ "time"
"github.com/ethereum/go-ethereum/common"
"gopkg.in/fatih/set.v0"
@@ -27,14 +30,15 @@ type peer struct {
head common.Hash // Hash of the peers latest known block
idle int32 // Current activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation (not used currently)
+ rep int32 // Simple peer reputation
- mu sync.RWMutex
+ capacity int32 // Number of blocks allowed to fetch per request
+ started time.Time // Time instance when the last fetch was started
- ignored *set.Set
+ ignored *set.Set // Set of hashes not to request (didn't have previously)
- getHashes hashFetcherFn
- getBlocks blockFetcherFn
+ getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing)
+ getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing)
}
// newPeer create a new downloader peer, with specific hash and block retrieval
@@ -43,6 +47,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
return &peer{
id: id,
head: head,
+ capacity: 1,
getHashes: getHashes,
getBlocks: getBlocks,
ignored: set.New(),
@@ -52,6 +57,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
atomic.StoreInt32(&p.idle, 0)
+ atomic.StoreInt32(&p.capacity, 1)
p.ignored.Clear()
}
@@ -61,6 +67,8 @@ func (p *peer) Fetch(request *fetchRequest) error {
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching
}
+ p.started = time.Now()
+
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
for hash, _ := range request.Hashes {
@@ -72,10 +80,41 @@ func (p *peer) Fetch(request *fetchRequest) error {
}
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+// Its block retrieval allowance will also be updated either up- or downwards,
+// depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() {
+ // Update the peer's download allowance based on previous performance
+ scale := 2.0
+ if time.Since(p.started) > blockSoftTTL {
+ scale = 0.5
+ if time.Since(p.started) > blockHardTTL {
+ scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1
+ }
+ }
+ for {
+ // Calculate the new download bandwidth allowance
+ prev := atomic.LoadInt32(&p.capacity)
+ next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale)))
+
+ // Try to update the old value
+ if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
+ // If we're having problems at 1 capacity, try to find better peers
+ if next == 1 {
+ p.Demote()
+ }
+ break
+ }
+ }
+ // Set the peer to idle to allow further block requests
atomic.StoreInt32(&p.idle, 0)
}
+// Capacity retrieves the peers block download allowance based on its previously
+// discovered bandwidth capacity.
+func (p *peer) Capacity() int {
+ return int(atomic.LoadInt32(&p.capacity))
+}
+
// Promote increases the peer's reputation.
func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1)
@@ -95,6 +134,15 @@ func (p *peer) Demote() {
}
}
+// String implements fmt.Stringer.
+func (p *peer) String() string {
+ return fmt.Sprintf("Peer %s [%s]", p.id,
+ fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
+ fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+
+ fmt.Sprintf("ignored %4d", p.ignored.Size()),
+ )
+}
+
// peerSet represents the collection of active peer participating in the block
// download procedure.
type peerSet struct {