aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r--eth/downloader/peer.go25
1 files changed, 13 insertions, 12 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 15a912f1f..dc8b09772 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -30,6 +30,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
@@ -195,7 +196,7 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
}
// FetchNodeData sends a node state data retrieval request to the remote peer.
-func (p *peer) FetchNodeData(request *fetchRequest) error {
+func (p *peer) FetchNodeData(hashes []common.Hash) error {
// Sanity check the protocol version
if p.version < 63 {
panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
@@ -205,14 +206,7 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return errAlreadyFetching
}
p.stateStarted = time.Now()
-
- // Convert the hash set to a retrievable slice
- hashes := make([]common.Hash, 0, len(request.Hashes))
- for hash := range request.Hashes {
- hashes = append(hashes, hash)
- }
go p.getNodeData(hashes)
-
return nil
}
@@ -343,8 +337,9 @@ func (p *peer) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain
// download procedure.
type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
+ peers map[string]*peer
+ newPeerFeed event.Feed
+ lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
@@ -354,6 +349,10 @@ func newPeerSet() *peerSet {
}
}
+func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription {
+ return ps.newPeerFeed.Subscribe(ch)
+}
+
// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
@@ -377,9 +376,8 @@ func (ps *peerSet) Register(p *peer) error {
// Register the new peer with some meaningful defaults
ps.lock.Lock()
- defer ps.lock.Unlock()
-
if _, ok := ps.peers[p.id]; ok {
+ ps.lock.Unlock()
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
@@ -399,6 +397,9 @@ func (ps *peerSet) Register(p *peer) error {
p.stateThroughput /= float64(len(ps.peers))
}
ps.peers[p.id] = p
+ ps.lock.Unlock()
+
+ ps.newPeerFeed.Send(p)
return nil
}