diff options
Diffstat (limited to 'les/odr.go')
-rw-r--r-- | les/odr.go | 99 |
1 files changed, 55 insertions, 44 deletions
diff --git a/les/odr.go b/les/odr.go index afc894ab5..06b44d318 100644 --- a/les/odr.go +++ b/les/odr.go @@ -32,14 +32,12 @@ import ( var ( softRequestTimeout = time.Millisecond * 500 hardRequestTimeout = time.Second * 10 - retryPeers = time.Second * 1 ) // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) type odrPeerSelector interface { - selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer adjustResponseTime(*poolEntry, time.Duration, bool) } @@ -51,6 +49,7 @@ type LesOdr struct { mlock, clock sync.Mutex sentReqs map[uint64]*sentReq serverPool odrPeerSelector + reqDist *requestDistributor } func NewLesOdr(db ethdb.Database) *LesOdr { @@ -165,69 +164,81 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error { answered := make(chan struct{}) req := &sentReq{ - valFunc: lreq.Valid, + valFunc: lreq.Validate, sentTo: make(map[*peer]chan struct{}), answered: answered, // reply delivered by any peer } - reqID := getNextReqID() - self.mlock.Lock() - self.sentReqs[reqID] = req - self.mlock.Unlock() + + exclude := make(map[*peer]struct{}) reqWg := new(sync.WaitGroup) reqWg.Add(1) defer reqWg.Done() - go func() { - reqWg.Wait() - self.mlock.Lock() - delete(self.sentReqs, reqID) - self.mlock.Unlock() - }() - exclude := make(map[*peer]struct{}) - for { - var p *peer - if self.serverPool != nil { - p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) { - if _, ok := exclude[p]; ok || !lreq.CanSend(p) { - return false, 0 - } - return true, p.fcServer.CanSend(lreq.GetCost(p)) - }, ctx.Done()) - } - if p == nil { - select { - case <-ctx.Done(): - return ctx.Err() - case <-req.answered: - return nil - case <-time.After(retryPeers): - } - } else { + var timeout chan struct{} + reqID := getNextReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + return lreq.GetCost(dp.(*peer)) + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + _, ok := exclude[p] + return !ok && lreq.CanSend(p) + }, + request: func(dp distPeer) func() { + p := dp.(*peer) exclude[p] = struct{}{} delivered := make(chan struct{}) - timeout := make(chan struct{}) + timeout = make(chan struct{}) req.lock.Lock() req.sentTo[p] = delivered req.lock.Unlock() reqWg.Add(1) cost := lreq.GetCost(p) - p.fcServer.SendRequest(reqID, cost) + p.fcServer.QueueRequest(reqID, cost) go self.requestPeer(req, p, delivered, timeout, reqWg) - lreq.Request(reqID, p) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-answered: - return nil - case <-timeout: + return func() { lreq.Request(reqID, p) } + }, + } + + self.mlock.Lock() + self.sentReqs[reqID] = req + self.mlock.Unlock() + + go func() { + reqWg.Wait() + self.mlock.Lock() + delete(self.sentReqs, reqID) + self.mlock.Unlock() + }() + + for { + peerChn := self.reqDist.queue(rq) + select { + case <-ctx.Done(): + self.reqDist.cancel(rq) + return ctx.Err() + case <-answered: + self.reqDist.cancel(rq) + return nil + case _, ok := <-peerChn: + if !ok { + return ErrNoPeers } } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-answered: + return nil + case <-timeout: + } } } -// Retrieve tries to fetch an object from the local db, then from the LES network. +// Retrieve tries to fetch an object from the LES network. // If the network retrieval was successful, it stores the object in local db. func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { lreq := LesRequest(req) |