From 0de9f32ae8346ece3bbd596291f132385962d0ca Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 10 Apr 2019 10:44:19 +0200 Subject: les: backported new SendTx cost calculation --- les/handler_test.go | 5 +++-- les/peer.go | 37 ++++++++++++++++++++++++++++++++++--- les/txrelay.go | 8 +++++--- 3 files changed, 42 insertions(+), 8 deletions(-) (limited to 'les') diff --git a/les/handler_test.go b/les/handler_test.go index 43be7f41b..a76848279 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -508,8 +508,9 @@ func TestTransactionStatusLes2(t *testing.T) { test := func(tx *types.Transaction, send bool, expStatus txStatus) { reqID++ if send { - cost := peer.GetRequestCost(SendTxV2Msg, 1) - sendRequest(peer.app, SendTxV2Msg, reqID, cost, types.Transactions{tx}) + enc, _ := rlp.EncodeToBytes(types.Transactions{tx}) + cost := peer.GetTxRelayCost(1, len(enc)) + sendRequest(peer.app, SendTxV2Msg, reqID, cost, rlp.RawValue(enc)) } else { cost := peer.GetRequestCost(GetTxStatusMsg, 1) sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()}) diff --git a/les/peer.go b/les/peer.go index 678384f0e..bf097f666 100644 --- a/les/peer.go +++ b/les/peer.go @@ -42,6 +42,11 @@ var ( const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) +// if the total encoded size of a sent transaction batch is over txSizeCostLimit +// per transaction then the request cost is calculated as proportional to the +// encoded size instead of the transaction count +const txSizeCostLimit = 0x4000 + const ( announceTypeNone = iota announceTypeSimple @@ -170,6 +175,32 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { return cost } +func (p *peer) GetTxRelayCost(amount, size int) uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + + var msgcode uint64 + switch p.version { + case lpv1: + msgcode = SendTxMsg + case lpv2: + msgcode = SendTxV2Msg + default: + panic(nil) + } + + cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) + sizeCost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(size)/txSizeCostLimit + if sizeCost > cost { + cost = sizeCost + } + + if cost > p.fcServerParams.BufLimit { + cost = p.fcServerParams.BufLimit + } + return cost +} + // HasBlock checks if the peer has a given block func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { p.lock.RLock() @@ -307,9 +338,9 @@ func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes) } -// SendTxStatus sends a batch of transactions to be added to the remote transaction pool. -func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { - p.Log().Debug("Fetching batch of transactions", "count", len(txs)) +// SendTxs sends a batch of transactions to be added to the remote transaction pool. +func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error { + p.Log().Debug("Fetching batch of transactions", "size", len(txs)) switch p.version { case lpv1: return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID diff --git a/les/txrelay.go b/les/txrelay.go index 7a02cc837..5decd6e93 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" ) type ltrInfo struct { @@ -113,21 +114,22 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { for p, list := range sendTo { pp := p ll := list + enc, _ := rlp.EncodeToBytes(ll) reqID := genReqID() rq := &distReq{ getCost: func(dp distPeer) uint64 { peer := dp.(*peer) - return peer.GetRequestCost(SendTxMsg, len(ll)) + return peer.GetTxRelayCost(len(ll), len(enc)) }, canSend: func(dp distPeer) bool { return dp.(*peer) == pp }, request: func(dp distPeer) func() { peer := dp.(*peer) - cost := peer.GetRequestCost(SendTxMsg, len(ll)) + cost := peer.GetTxRelayCost(len(ll), len(enc)) peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.SendTxs(reqID, cost, ll) } + return func() { peer.SendTxs(reqID, cost, enc) } }, } self.reqDist.queue(rq) -- cgit From c8d8126bd0c4cc091978ba6dc8c656452f97ed4a Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 10 Apr 2019 11:01:54 +0200 Subject: les: check required message types in cost table --- les/handler.go | 6 +++++- les/peer.go | 28 +++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) (limited to 'les') diff --git a/les/handler.go b/les/handler.go index 19ccbcd2b..2fb2067dd 100644 --- a/les/handler.go +++ b/les/handler.go @@ -324,7 +324,11 @@ func (pm *ProtocolManager) handle(p *peer) error { } } -var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg} +var ( + reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg} + reqListV1 = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, GetHeaderProofsMsg} + reqListV2 = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, SendTxV2Msg, GetTxStatusMsg, GetProofsV2Msg, GetHelperTrieProofsMsg} +) // handleMsg is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. diff --git a/les/peer.go b/les/peer.go index bf097f666..5072010c5 100644 --- a/les/peer.go +++ b/les/peer.go @@ -168,7 +168,11 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { p.lock.RLock() defer p.lock.RUnlock() - cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) + costs := p.fcCosts[msgcode] + if costs == nil { + return 0 + } + cost := costs.baseCost + costs.reqCost*uint64(amount) if cost > p.fcServerParams.BufLimit { cost = p.fcServerParams.BufLimit } @@ -189,8 +193,12 @@ func (p *peer) GetTxRelayCost(amount, size int) uint64 { panic(nil) } - cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) - sizeCost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(size)/txSizeCostLimit + costs := p.fcCosts[msgcode] + if costs == nil { + return 0 + } + cost := costs.baseCost + costs.reqCost*uint64(amount) + sizeCost := costs.baseCost + costs.reqCost*uint64(size)/txSizeCostLimit if sizeCost > cost { cost = sizeCost } @@ -516,6 +524,20 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis p.fcServerParams = params p.fcServer = flowcontrol.NewServerNode(params) p.fcCosts = MRC.decode() + var checkList []uint64 + switch p.version { + case lpv1: + checkList = reqListV1 + case lpv2: + checkList = reqListV2 + default: + panic(nil) + } + for _, msgCode := range checkList { + if p.fcCosts[msgCode] == nil { + return errResp(ErrUselessPeer, "peer does not support message %d", msgCode) + } + } } p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} -- cgit