diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-03-23 03:44:22 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-03-23 03:44:22 +0800 |
commit | 525116dbff916825463931361f75e75e955c12e2 (patch) | |
tree | b272801c420ad9a591f227919567c7952b0bd512 /les/handler.go | |
parent | 1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff) | |
download | dexon-525116dbff916825463931361f75e75e955c12e2.tar.gz dexon-525116dbff916825463931361f75e75e955c12e2.tar.zst dexon-525116dbff916825463931361f75e75e955c12e2.zip |
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
Diffstat (limited to 'les/handler.go')
-rw-r--r-- | les/handler.go | 66 |
1 files changed, 57 insertions, 9 deletions
diff --git a/les/handler.go b/les/handler.go index 4271da8b8..ece2060ee 100644 --- a/les/handler.go +++ b/les/handler.go @@ -102,6 +102,7 @@ type ProtocolManager struct { odr *LesOdr server *LesServer serverPool *serverPool + reqDist *requestDistributor downloader *downloader.Downloader fetcher *lightFetcher @@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) } + manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} { + m := make(map[distPeer]struct{}) + peers := manager.peers.AllPeers() + for _, peer := range peers { + m[peer] = struct{}{} + } + return m + }, manager.quitSync) if odr != nil { odr.removePeer = removePeer + odr.reqDist = manager.reqDist } /*validator := func(block *types.Block, parent *types.Block) error { @@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error { if pm.lightSync { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { reqID := getNextReqID() - cost := p.GetRequestCost(GetBlockHeadersMsg, amount) - p.fcServer.MustAssignRequest(reqID) - p.fcServer.SendRequest(reqID, cost) - return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { reqID := getNextReqID() - cost := p.GetRequestCost(GetBlockHeadersMsg, amount) - p.fcServer.MustAssignRequest(reqID) - p.fcServer.SendRequest(reqID, cost) - return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil } if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { @@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } if deliverMsg != nil { - return pm.odr.Deliver(p, deliverMsg) + err := pm.odr.Deliver(p, deliverMsg) + if err != nil { + p.responseErrors++ + if p.responseErrors > maxResponseErrors { + return err + } + } } return nil } |