diff options
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 107 |
1 files changed, 79 insertions, 28 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 5fc0db587..5011d5d46 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type blockBodyFetcherFn func([]common.Hash) error type receiptFetcherFn func([]common.Hash) error +type stateFetcherFn func([]common.Hash) error var ( errAlreadyFetching = errors.New("already fetching blocks from peer") @@ -55,12 +56,16 @@ type peer struct { blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) + stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) rep int32 // Simple peer reputation - blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request - receiptCapacity int32 // Number of receipts allowed to fetch per request - blockStarted time.Time // Time instance when the last block (body)fetch was started - receiptStarted time.Time // Time instance when the last receipt fetch was started + blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request + receiptCapacity int32 // Number of receipts allowed to fetch per request + stateCapacity int32 // Number of node data pieces allowed to fetch per request + + blockStarted time.Time // Time instance when the last block (body)fetch was started + receiptStarted time.Time // Time instance when the last receipt fetch was started + stateStarted time.Time // Time instance when the last node data fetch was started ignored *set.Set // Set of hashes not to request (didn't have previously) @@ -73,6 +78,7 @@ type peer struct { getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts + getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies } @@ -82,12 +88,13 @@ type peer struct { func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn) *peer { + getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ id: id, head: head, blockCapacity: 1, receiptCapacity: 1, + stateCapacity: 1, ignored: set.New(), getRelHashes: getRelHashes, @@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash, getBlockBodies: getBlockBodies, getReceipts: getReceipts, + getNodeData: getNodeData, version: version, } @@ -110,6 +118,7 @@ func (p *peer) Reset() { atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.blockCapacity, 1) atomic.StoreInt32(&p.receiptCapacity, 1) + atomic.StoreInt32(&p.stateCapacity, 1) p.ignored.Clear() } @@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { return nil } +// FetchNodeData sends a node state data retrieval request to the remote peer. +func (p *peer) FetchNodeData(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) { + return errAlreadyFetching + } + p.stateStarted = time.Now() + + // Convert the hash set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Hashes)) + for hash, _ := range request.Hashes { + hashes = append(hashes, hash) + } + go p.getNodeData(hashes) + + return nil +} + // SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its block retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. @@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() { p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) } +// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval +// requests. Its node data retrieval allowance will also be updated either up- or +// downwards, depending on whether the previous fetch completed in time or not. +func (p *peer) SetNodeDataIdle() { + p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +} + // setIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its data retrieval allowance will also be updated either up- or downwards, // depending on whether the previous fetch completed in time or not. @@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int { return int(atomic.LoadInt32(&p.receiptCapacity)) } +// NodeDataCapacity retrieves the peers block download allowance based on its +// previously discovered bandwidth capacity. +func (p *peer) NodeDataCapacity() int { + return int(atomic.LoadInt32(&p.stateCapacity)) +} + // Promote increases the peer's reputation. func (p *peer) Promote() { atomic.AddInt32(&p.rep, 1) @@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer { // BlockIdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) { - ps.lock.RLock() - defer ps.lock.RUnlock() - - idle, total := make([]*peer, 0, len(ps.peers)), 0 - for _, p := range ps.peers { - if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) { - if atomic.LoadInt32(&p.blockIdle) == 0 { - idle = append(idle, p) - } - total++ - } +func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 } - for i := 0; i < len(idle); i++ { - for j := i + 1; j < len(idle); j++ { - if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { - idle[i], idle[j] = idle[j], idle[i] - } - } + return ps.idlePeers(61, 61, idle) +} + +// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within +// the active peer set, ordered by their reputation. +func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.blockIdle) == 0 } - return idle, total + return ps.idlePeers(62, 64, idle) } -// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the -// active peer set, ordered by their reputation. +// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers +// within the active peer set, ordered by their reputation. func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.receiptIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle +// peers within the active peer set, ordered by their reputation. +func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { + idle := func(p *peer) bool { + return atomic.LoadInt32(&p.stateIdle) == 0 + } + return ps.idlePeers(63, 64, idle) +} + +// idlePeers retrieves a flat list of all currently idle peers satisfying the +// protocol version constraints, using the provided function to check idleness. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() idle, total := make([]*peer, 0, len(ps.peers)), 0 for _, p := range ps.peers { - if p.version >= 63 { - if atomic.LoadInt32(&p.receiptIdle) == 0 { + if p.version >= minProtocol && p.version <= maxProtocol { + if idleCheck(p) { idle = append(idle, p) } total++ |