diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-05-11 19:26:20 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-05-11 19:26:20 +0800 |
commit | 685862d2ce32294aacb2455bf189ec8e5c4efce3 (patch) | |
tree | 4ac31deffa46d5dff3a704b0594059e8757abca7 /eth/downloader/peer.go | |
parent | 00280e62e3c422b2824a0280015b7b78578ab16d (diff) | |
download | dexon-685862d2ce32294aacb2455bf189ec8e5c4efce3.tar.gz dexon-685862d2ce32294aacb2455bf189ec8e5c4efce3.tar.zst dexon-685862d2ce32294aacb2455bf189ec8e5c4efce3.zip |
eth/downloader: fix #910, thread safe peers & polishes
Diffstat (limited to 'eth/downloader/peer.go')
-rw-r--r-- | eth/downloader/peer.go | 219 |
1 files changed, 143 insertions, 76 deletions
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 45ec1cbfd..e2dec5571 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -1,125 +1,192 @@ +// Contains the active peer-set of the downloader, maintaining both failures +// as well as reputation metrics to prioritize the block retrievals. + package downloader import ( "errors" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "gopkg.in/fatih/set.v0" ) -const ( - workingState = 2 - idleState = 4 -) - type hashFetcherFn func(common.Hash) error type blockFetcherFn func([]common.Hash) error -// XXX make threadsafe!!!! -type peers map[string]*peer +var ( + errAlreadyFetching = errors.New("already fetching blocks from peer") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) -func (p peers) reset() { - for _, peer := range p { - peer.reset() - } +// peer represents an active peer from which hashes and blocks are retrieved. +type peer struct { + id string // Unique identifier of the peer + head common.Hash // Hash of the peers latest known block + + idle int32 // Current activity state of the peer (idle = 0, active = 1) + rep int32 // Simple peer reputation (not used currently) + + mu sync.RWMutex + + ignored *set.Set + + getHashes hashFetcherFn + getBlocks blockFetcherFn } -func (p peers) get(state int) []*peer { - var peers []*peer - for _, peer := range p { - peer.mu.RLock() - if peer.state == state { - peers = append(peers, peer) - } - peer.mu.RUnlock() +// newPeer create a new downloader peer, with specific hash and block retrieval +// mechanisms. +func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { + return &peer{ + id: id, + head: head, + getHashes: getHashes, + getBlocks: getBlocks, + ignored: set.New(), } +} - return peers +// Reset clears the internal state of a peer entity. +func (p *peer) Reset() { + atomic.StoreInt32(&p.idle, 0) + p.ignored.Clear() } -func (p peers) setState(id string, state int) { - if peer, exist := p[id]; exist { - peer.mu.Lock() - defer peer.mu.Unlock() - peer.state = state +// Fetch sends a block retrieval request to the remote peer. +func (p *peer) Fetch(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + return errAlreadyFetching } -} + // Convert the hash set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Hashes)) + for hash, _ := range request.Hashes { + hashes = append(hashes, hash) + } + p.getBlocks(hashes) -func (p peers) getPeer(id string) *peer { - return p[id] + return nil } -// peer represents an active peer -type peer struct { - state int // Peer state (working, idle) - rep int // TODO peer reputation +// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +func (p *peer) SetIdle() { + atomic.StoreInt32(&p.idle, 0) +} - mu sync.RWMutex - id string - recentHash common.Hash +// Promote increases the peer's reputation. +func (p *peer) Promote() { + atomic.AddInt32(&p.rep, 1) +} - ignored *set.Set +// Demote decreases the peer's reputation or leaves it at 0. +func (p *peer) Demote() { + for { + // Calculate the new reputation value + prev := atomic.LoadInt32(&p.rep) + next := prev - 2 + if next < 0 { + next = 0 + } + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.rep, prev, next) { + return + } + } +} - getHashes hashFetcherFn - getBlocks blockFetcherFn +// peerSet represents the collection of active peer participating in the block +// download procedure. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex } -// create a new peer -func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { - return &peer{ - id: id, - recentHash: hash, - getHashes: getHashes, - getBlocks: getBlocks, - state: idleState, - ignored: set.New(), +// newPeerSet creates a new peer set top track the active download sources. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), } } -// fetch a chunk using the peer -func (p *peer) fetch(request *fetchRequest) error { - p.mu.Lock() - defer p.mu.Unlock() +// Reset iterates over the current peer set, and resets each of the known peers +// to prepare for a next batch of block retrieval. +func (ps *peerSet) Reset() { + ps.lock.RLock() + defer ps.lock.RUnlock() - if p.state == workingState { - return errors.New("peer already fetching chunk") + for _, peer := range ps.peers { + peer.Reset() } +} - // set working state - p.state = workingState +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() - // Convert the hash set to a fetchable slice - hashes := make([]common.Hash, 0, len(request.Hashes)) - for hash, _ := range request.Hashes { - hashes = append(hashes, hash) + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered } - p.getBlocks(hashes) + ps.peers[p.id] = p + return nil +} +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[id]; !ok { + return errNotRegistered + } + delete(ps.peers, id) return nil } -// promote increases the peer's reputation -func (p *peer) promote() { - p.mu.Lock() - defer p.mu.Unlock() +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Peers returns if the current number of peers in the set. +func (ps *peerSet) Peers() int { + ps.lock.RLock() + defer ps.lock.RUnlock() - p.rep++ + return len(ps.peers) } -// demote decreases the peer's reputation or leaves it at 0 -func (p *peer) demote() { - p.mu.Lock() - defer p.mu.Unlock() +// AllPeers retrieves a flat list of all the peers within the set. +func (ps *peerSet) AllPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() - if p.rep > 1 { - p.rep -= 2 - } else { - p.rep = 0 + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) } + return list } -func (p *peer) reset() { - p.state = idleState - p.ignored.Clear() +// IdlePeers retrieves a flat list of all the currently idle peers within the +// active peer set. +func (ps *peerSet) IdlePeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if atomic.LoadInt32(&p.idle) == 0 { + list = append(list, p) + } + } + return list } |