aboutsummaryrefslogtreecommitdiffstats
path: root/les/peer.go
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-03-23 03:44:22 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-03-23 03:44:22 +0800
commit525116dbff916825463931361f75e75e955c12e2 (patch)
treeb272801c420ad9a591f227919567c7952b0bd512 /les/peer.go
parent1c1dc0e0fc41d871aa17377d407515f437d3a54d (diff)
downloaddexon-525116dbff916825463931361f75e75e955c12e2.tar.gz
dexon-525116dbff916825463931361f75e75e955c12e2.tar.zst
dexon-525116dbff916825463931361f75e75e955c12e2.zip
les: implement request distributor, fix blocking issues (#3660)
* les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock
Diffstat (limited to 'les/peer.go')
-rw-r--r--les/peer.go35
1 files changed, 27 insertions, 8 deletions
diff --git a/les/peer.go b/les/peer.go
index ef5f8a6ce..4793da296 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -22,6 +22,7 @@ import (
"fmt"
"math/big"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -37,7 +38,10 @@ var (
errNotRegistered = errors.New("peer is not registered")
)
-const maxHeadInfoLen = 20
+const (
+ maxHeadInfoLen = 20
+ maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
+)
type peer struct {
*p2p.Peer
@@ -53,9 +57,11 @@ type peer struct {
lock sync.RWMutex
announceChn chan announceData
+ sendQueue *execQueue
- poolEntry *poolEntry
- hasBlock func(common.Hash, uint64) bool
+ poolEntry *poolEntry
+ hasBlock func(common.Hash, uint64) bool
+ responseErrors int
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
@@ -76,6 +82,14 @@ func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
}
}
+func (p *peer) canQueue() bool {
+ return p.sendQueue.canQueue()
+}
+
+func (p *peer) queueSend(f func()) {
+ p.sendQueue.queue(f)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *eth.PeerInfo {
return &eth.PeerInfo{
@@ -117,6 +131,11 @@ func (p *peer) Td() *big.Int {
return new(big.Int).Set(p.headInfo.Td)
}
+// waitBefore implements distPeer interface
+func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
+ return p.fcServer.CanSend(maxCost)
+}
+
func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
type req struct {
ReqID uint64
@@ -237,11 +256,8 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
}
-func (p *peer) SendTxs(cost uint64, txs types.Transactions) error {
+func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
p.Log().Debug("Fetching batch of transactions", "count", len(txs))
- reqID := getNextReqID()
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
return p2p.Send(p.rw, SendTxMsg, txs)
}
@@ -444,6 +460,7 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered
}
ps.peers[p.id] = p
+ p.sendQueue = newExecQueue(100)
return nil
}
@@ -453,8 +470,10 @@ func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
defer ps.lock.Unlock()
- if _, ok := ps.peers[id]; !ok {
+ if p, ok := ps.peers[id]; !ok {
return errNotRegistered
+ } else {
+ p.sendQueue.quit()
}
delete(ps.peers, id)
return nil