diff options
Diffstat (limited to 'les/odr.go')
-rw-r--r-- | les/odr.go | 73 |
1 files changed, 36 insertions, 37 deletions
diff --git a/les/odr.go b/les/odr.go index 444b1da2a..8878508c4 100644 --- a/les/odr.go +++ b/les/odr.go @@ -17,6 +17,8 @@ package les import ( + "crypto/rand" + "encoding/binary" "sync" "time" @@ -37,6 +39,11 @@ var ( // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +type odrPeerSelector interface { + selectPeer(func(*peer) (bool, uint64)) *peer + adjustResponseTime(*poolEntry, time.Duration, bool) +} + type LesOdr struct { light.OdrBackend db ethdb.Database @@ -44,15 +51,13 @@ type LesOdr struct { removePeer peerDropFn mlock, clock sync.Mutex sentReqs map[uint64]*sentReq - peers *odrPeerSet - lastReqID uint64 + serverPool odrPeerSelector } func NewLesOdr(db ethdb.Database) *LesOdr { return &LesOdr{ db: db, stop: make(chan struct{}), - peers: newOdrPeerSet(), sentReqs: make(map[uint64]*sentReq), } } @@ -77,16 +82,6 @@ type sentReq struct { answered chan struct{} // closed and set to nil when any peer answers it } -// RegisterPeer registers a new LES peer to the ODR capable peer set -func (self *LesOdr) RegisterPeer(p *peer) error { - return self.peers.register(p) -} - -// UnregisterPeer removes a peer from the ODR capable peer set -func (self *LesOdr) UnregisterPeer(p *peer) { - self.peers.unregister(p) -} - const ( MsgBlockBodies = iota MsgCode @@ -142,29 +137,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha select { case <-delivered: - servTime := uint64(mclock.Now() - stime) - self.peers.updateTimeout(peer, false) - self.peers.updateServTime(peer, servTime) + if self.serverPool != nil { + self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false) + } return case <-time.After(softRequestTimeout): close(timeout) - if self.peers.updateTimeout(peer, true) { - self.removePeer(peer.id) - } case <-self.stop: return } select { case <-delivered: - servTime := uint64(mclock.Now() - stime) - self.peers.updateServTime(peer, servTime) - return case <-time.After(hardRequestTimeout): - self.removePeer(peer.id) + go self.removePeer(peer.id) case <-self.stop: return } + if self.serverPool != nil { + self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true) + } } // networkRequest sends a request to known peers until an answer is received @@ -176,7 +168,7 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro sentTo: make(map[*peer]chan struct{}), answered: answered, // reply delivered by any peer } - reqID := self.getNextReqID() + reqID := getNextReqID() self.mlock.Lock() self.sentReqs[reqID] = req self.mlock.Unlock() @@ -193,7 +185,16 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro exclude := make(map[*peer]struct{}) for { - if peer := self.peers.bestPeer(lreq, exclude); peer == nil { + var p *peer + if self.serverPool != nil { + p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) { + if !lreq.CanSend(p) { + return false, 0 + } + return true, p.fcServer.CanSend(lreq.GetCost(p)) + }) + } + if p == nil { select { case <-ctx.Done(): return ctx.Err() @@ -202,17 +203,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro case <-time.After(retryPeers): } } else { - exclude[peer] = struct{}{} + exclude[p] = struct{}{} delivered := make(chan struct{}) timeout := make(chan struct{}) req.lock.Lock() - req.sentTo[peer] = delivered + req.sentTo[p] = delivered req.lock.Unlock() reqWg.Add(1) - cost := lreq.GetCost(peer) - peer.fcServer.SendRequest(reqID, cost) - go self.requestPeer(req, peer, delivered, timeout, reqWg) - lreq.Request(reqID, peer) + cost := lreq.GetCost(p) + p.fcServer.SendRequest(reqID, cost) + go self.requestPeer(req, p, delivered, timeout, reqWg) + lreq.Request(reqID, p) select { case <-ctx.Done(): @@ -239,10 +240,8 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err return } -func (self *LesOdr) getNextReqID() uint64 { - self.clock.Lock() - defer self.clock.Unlock() - - self.lastReqID++ - return self.lastReqID +func getNextReqID() uint64 { + var rnd [8]byte + rand.Read(rnd[:]) + return binary.BigEndian.Uint64(rnd[:]) } |