aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go107
1 files changed, 79 insertions, 28 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 5fc0db587..5011d5d46 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error
+type stateFetcherFn func([]common.Hash) error
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
@@ -55,12 +56,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+ stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation
- blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
- receiptCapacity int32 // Number of receipts allowed to fetch per request
- blockStarted time.Time // Time instance when the last block (body)fetch was started
- receiptStarted time.Time // Time instance when the last receipt fetch was started
+ blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
+ receiptCapacity int32 // Number of receipts allowed to fetch per request
+ stateCapacity int32 // Number of node data pieces allowed to fetch per request
+
+ blockStarted time.Time // Time instance when the last block (body)fetch was started
+ receiptStarted time.Time // Time instance when the last receipt fetch was started
+ stateStarted time.Time // Time instance when the last node data fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously)
@@ -73,6 +78,7 @@ type peer struct {
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
+ getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
}
@@ -82,12 +88,13 @@ type peer struct {
func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
- getReceipts receiptFetcherFn) *peer {
+ getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{
id: id,
head: head,
blockCapacity: 1,
receiptCapacity: 1,
+ stateCapacity: 1,
ignored: set.New(),
getRelHashes: getRelHashes,
@@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash,
getBlockBodies: getBlockBodies,
getReceipts: getReceipts,
+ getNodeData: getNodeData,
version: version,
}
@@ -110,6 +118,7 @@ func (p *peer) Reset() {
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.blockCapacity, 1)
atomic.StoreInt32(&p.receiptCapacity, 1)
+ atomic.StoreInt32(&p.stateCapacity, 1)
p.ignored.Clear()
}
@@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
return nil
}
+// FetchNodeData sends a node state data retrieval request to the remote peer.
+func (p *peer) FetchNodeData(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.stateStarted = time.Now()
+
+ // Convert the hash set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Hashes))
+ for hash, _ := range request.Hashes {
+ hashes = append(hashes, hash)
+ }
+ go p.getNodeData(hashes)
+
+ return nil
+}
+
// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
@@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() {
p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
}
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
+// requests. Its node data retrieval allowance will also be updated either up- or
+// downwards, depending on whether the previous fetch completed in time or not.
+func (p *peer) SetNodeDataIdle() {
+ p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+}
+
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
@@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int {
return int(atomic.LoadInt32(&p.receiptCapacity))
}
+// NodeDataCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) NodeDataCapacity() int {
+ return int(atomic.LoadInt32(&p.stateCapacity))
+}
+
// Promote increases the peer's reputation.
func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1)
@@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer {
// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
-func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- idle, total := make([]*peer, 0, len(ps.peers)), 0
- for _, p := range ps.peers {
- if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
- if atomic.LoadInt32(&p.blockIdle) == 0 {
- idle = append(idle, p)
- }
- total++
- }
+func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
}
- for i := 0; i < len(idle); i++ {
- for j := i + 1; j < len(idle); j++ {
- if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
- idle[i], idle[j] = idle[j], idle[i]
- }
- }
+ return ps.idlePeers(61, 61, idle)
+}
+
+// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
+// the active peer set, ordered by their reputation.
+func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
}
- return idle, total
+ return ps.idlePeers(62, 64, idle)
}
-// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
-// active peer set, ordered by their reputation.
+// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
+// within the active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.receiptIdle) == 0
+ }
+ return ps.idlePeers(63, 64, idle)
+}
+
+// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
+// peers within the active peer set, ordered by their reputation.
+func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.stateIdle) == 0
+ }
+ return ps.idlePeers(63, 64, idle)
+}
+
+// idlePeers retrieves a flat list of all currently idle peers satisfying the
+// protocol version constraints, using the provided function to check idleness.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
- if p.version >= 63 {
- if atomic.LoadInt32(&p.receiptIdle) == 0 {
+ if p.version >= minProtocol && p.version <= maxProtocol {
+ if idleCheck(p) {
idle = append(idle, p)
}
total++