aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-03-23 03:44:22 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-03-23 03:44:22 +0800
commit525116dbff916825463931361f75e75e955c12e2 (patch)
treeb272801c420ad9a591f227919567c7952b0bd512 /les/handler.go
parent1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff)
downloadgo-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.gz
go-tangerine-525116dbff916825463931361f75e75e955c12e2.tar.zst
go-tangerine-525116dbff916825463931361f75e75e955c12e2.zip
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go66
1 files changed, 57 insertions, 9 deletions
diff --git a/les/handler.go b/les/handler.go
index 4271da8b8..ece2060ee 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -102,6 +102,7 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ reqDist *requestDistributor
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
}
+ manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
+ m := make(map[distPeer]struct{})
+ peers := manager.peers.AllPeers()
+ for _, peer := range peers {
+ m[peer] = struct{}{}
+ }
+ return m
+ }, manager.quitSync)
if odr != nil {
odr.removePeer = removePeer
+ odr.reqDist = manager.reqDist
}
/*validator := func(block *types.Block, parent *types.Block) error {
@@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error {
if pm.lightSync {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
- cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
- return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
- cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
- return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
}
if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
@@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
if deliverMsg != nil {
- return pm.odr.Deliver(p, deliverMsg)
+ err := pm.odr.Deliver(p, deliverMsg)
+ if err != nil {
+ p.responseErrors++
+ if p.responseErrors > maxResponseErrors {
+ return err
+ }
+ }
}
return nil
}