aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-01-09 22:58:23 +0800
committerGitHub <noreply@github.com>2017-01-09 22:58:23 +0800
commit681b51aac46fa11c235e9ac1af1228e0105a0720 (patch)
treeecbd5467a5f7356b2386189a67b8629663068cec /les/handler.go
parent4268cb8efecceaf506e1b0b71e06714a838a49a8 (diff)
parent66979aa468b6329aabf49542bd3db14e59010c20 (diff)
downloaddexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.gz
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.tar.zst
dexon-681b51aac46fa11c235e9ac1af1228e0105a0720.zip
Merge pull request #3519 from zsfelfoldi/light-topic5
les: fixed selectPeer deadlock, improved request distribution
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go69
1 files changed, 36 insertions, 33 deletions
diff --git a/les/handler.go b/les/handler.go
index b024841f2..603ce9ad4 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -24,6 +24,7 @@ import (
"math/big"
"net"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -228,6 +229,12 @@ func (pm *ProtocolManager) removePeer(id string) {
if peer == nil {
return
}
+ if err := pm.peers.Unregister(id); err != nil {
+ if err == errNotRegistered {
+ return
+ }
+ glog.V(logger.Error).Infoln("Removal failed:", err)
+ }
glog.V(logger.Debug).Infoln("Removing peer", id)
// Unregister the peer from the downloader and Ethereum peer set
@@ -241,9 +248,6 @@ func (pm *ProtocolManager) removePeer(id string) {
pm.fetcher.removePeer(peer)
}
}
- if err := pm.peers.Unregister(id); err != nil {
- glog.V(logger.Error).Infoln("Removal failed:", err)
- }
// Hard disconnect at the networking layer
if peer != nil {
peer.Peer.Disconnect(p2p.DiscUselessPeer)
@@ -340,12 +344,14 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
+ p.fcServer.MustAssignRequest(reqID)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
+ p.fcServer.MustAssignRequest(reqID)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
}
@@ -404,26 +410,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return err
}
- var costs *requestCosts
- var reqCnt, maxReqs int
-
glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size)
- if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type
- costs = rc
- if p.fcClient == nil {
- return errResp(ErrRequestRejected, "")
+
+ costs := p.fcCosts[msg.Code]
+ reject := func(reqCnt, maxCnt uint64) bool {
+ if p.fcClient == nil || reqCnt > maxCnt {
+ return true
}
- bv, ok := p.fcClient.AcceptRequest()
- if !ok || bv < costs.baseCost {
- return errResp(ErrRequestRejected, "")
+ bufValue, _ := p.fcClient.AcceptRequest()
+ cost := costs.baseCost + reqCnt*costs.reqCost
+ if cost > pm.server.defParams.BufLimit {
+ cost = pm.server.defParams.BufLimit
}
- maxReqs = 10000
- if bv < pm.server.defParams.BufLimit {
- d := bv - costs.baseCost
- if d/10000 < costs.reqCost {
- maxReqs = int(d / costs.reqCost)
- }
+ if cost > bufValue {
+ glog.V(logger.Error).Infof("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge))
+ return true
}
+ return false
}
if msg.Size > ProtocolMaxMsgSize {
@@ -450,7 +453,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
if pm.fetcher != nil {
- go pm.fetcher.announce(p, &req)
+ pm.fetcher.announce(p, &req)
}
case GetBlockHeadersMsg:
@@ -465,7 +468,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
query := req.Query
- if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch {
+ if reject(query.Amount, MaxHeaderFetch) {
return errResp(ErrRequestRejected, "")
}
@@ -573,8 +576,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
bodies []rlp.RawValue
)
- reqCnt = len(req.Hashes)
- if reqCnt > maxReqs || reqCnt > MaxBodyFetch {
+ reqCnt := len(req.Hashes)
+ if reject(uint64(reqCnt), MaxBodyFetch) {
return errResp(ErrRequestRejected, "")
}
for _, hash := range req.Hashes {
@@ -627,8 +630,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
data [][]byte
)
- reqCnt = len(req.Reqs)
- if reqCnt > maxReqs || reqCnt > MaxCodeFetch {
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxCodeFetch) {
return errResp(ErrRequestRejected, "")
}
for _, req := range req.Reqs {
@@ -688,8 +691,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
receipts []rlp.RawValue
)
- reqCnt = len(req.Hashes)
- if reqCnt > maxReqs || reqCnt > MaxReceiptFetch {
+ reqCnt := len(req.Hashes)
+ if reject(uint64(reqCnt), MaxReceiptFetch) {
return errResp(ErrRequestRejected, "")
}
for _, hash := range req.Hashes {
@@ -751,8 +754,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
proofs proofsData
)
- reqCnt = len(req.Reqs)
- if reqCnt > maxReqs || reqCnt > MaxProofsFetch {
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxProofsFetch) {
return errResp(ErrRequestRejected, "")
}
for _, req := range req.Reqs {
@@ -818,8 +821,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
proofs []ChtResp
)
- reqCnt = len(req.Reqs)
- if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch {
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
return errResp(ErrRequestRejected, "")
}
for _, req := range req.Reqs {
@@ -872,8 +875,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- reqCnt = len(txs)
- if reqCnt > maxReqs || reqCnt > MaxTxSend {
+ reqCnt := len(txs)
+ if reject(uint64(reqCnt), MaxTxSend) {
return errResp(ErrRequestRejected, "")
}