diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-06-21 18:27:38 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-06-21 18:27:38 +0800 |
commit | a5d08c893d61f66d60d8a91216aee5347b78f93e (patch) | |
tree | 500f3a788ecd4f299692ce1d1069f2efdc79d73d /les/distributor.go | |
parent | 60e27b51bc5643bc6a76151020a9e1a245340b70 (diff) | |
download | dexon-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.gz dexon-a5d08c893d61f66d60d8a91216aee5347b78f93e.tar.zst dexon-a5d08c893d61f66d60d8a91216aee5347b78f93e.zip |
les: code refactoring (#14416)
This commit does various code refactorings:
- generalizes and moves the request retrieval/timeout/resend logic out of LesOdr
(will be used by a subsequent PR)
- reworks the peer management logic so that all services can register with
peerSet to get notified about added/dropped peers (also gets rid of the ugly
getAllPeers callback in requestDistributor)
- moves peerSet, LesOdr, requestDistributor and retrieveManager initialization
out of ProtocolManager because I believe they do not really belong there and the
whole init process was ugly and ad-hoc
Diffstat (limited to 'les/distributor.go')
-rw-r--r-- | les/distributor.go | 58 |
1 files changed, 42 insertions, 16 deletions
diff --git a/les/distributor.go b/les/distributor.go index 71afe2b73..e8ef5b02e 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -34,11 +34,11 @@ var ErrNoPeers = errors.New("no suitable peers available") type requestDistributor struct { reqQueue *list.List lastReqOrder uint64 + peers map[distPeer]struct{} + peerLock sync.RWMutex stopChn, loopChn chan struct{} loopNextSent bool lock sync.Mutex - - getAllPeers func() map[distPeer]struct{} } // distPeer is an LES server peer interface for the request distributor. @@ -71,15 +71,39 @@ type distReq struct { } // newRequestDistributor creates a new request distributor -func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor { - r := &requestDistributor{ - reqQueue: list.New(), - loopChn: make(chan struct{}, 2), - stopChn: stopChn, - getAllPeers: getAllPeers, +func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor { + d := &requestDistributor{ + reqQueue: list.New(), + loopChn: make(chan struct{}, 2), + stopChn: stopChn, + peers: make(map[distPeer]struct{}), + } + if peers != nil { + peers.notify(d) } - go r.loop() - return r + go d.loop() + return d +} + +// registerPeer implements peerSetNotify +func (d *requestDistributor) registerPeer(p *peer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() +} + +// unregisterPeer implements peerSetNotify +func (d *requestDistributor) unregisterPeer(p *peer) { + d.peerLock.Lock() + delete(d.peers, p) + d.peerLock.Unlock() +} + +// registerTestPeer adds a new test peer +func (d *requestDistributor) registerTestPeer(p distPeer) { + d.peerLock.Lock() + d.peers[p] = struct{}{} + d.peerLock.Unlock() } // distMaxWait is the maximum waiting time after which further necessary waiting @@ -152,8 +176,7 @@ func (sp selectPeerItem) Weight() int64 { // nextRequest returns the next possible request from any peer, along with the // associated peer and necessary waiting time func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { - peers := d.getAllPeers() - + checkedPeers := make(map[distPeer]struct{}) elem := d.reqQueue.Front() var ( bestPeer distPeer @@ -162,11 +185,14 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { sel *weightedRandomSelect ) - for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { + d.peerLock.RLock() + defer d.peerLock.RUnlock() + + for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { req := elem.Value.(*distReq) canSend := false - for peer, _ := range peers { - if peer.canQueue() && req.canSend(peer) { + for peer, _ := range d.peers { + if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { canSend = true cost := req.getCost(peer) wait, bufRemain := peer.waitBefore(cost) @@ -182,7 +208,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { bestWait = wait } } - delete(peers, peer) + checkedPeers[peer] = struct{}{} } } next := elem.Next() |