diff options
Diffstat (limited to 'les/flowcontrol/control.go')
-rw-r--r-- | les/flowcontrol/control.go | 121 |
1 files changed, 93 insertions, 28 deletions
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go index acb131ea4..e45537cf5 100644 --- a/les/flowcontrol/control.go +++ b/les/flowcontrol/control.go @@ -24,7 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" ) -const fcTimeConst = 1000000 +const fcTimeConst = time.Millisecond type ServerParams struct { BufLimit, MinRecharge uint64 @@ -33,7 +33,7 @@ type ServerParams struct { type ClientNode struct { params *ServerParams bufValue uint64 - lastTime int64 + lastTime mclock.AbsTime lock sync.Mutex cm *ClientManager cmNode *cmNode @@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode { cm: cm, params: params, bufValue: params.BufLimit, - lastTime: getTime(), + lastTime: mclock.Now(), } node.cmNode = cm.addNode(node) return node @@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) { cm.removeNode(peer.cmNode) } -func (peer *ClientNode) recalcBV(time int64) { +func (peer *ClientNode) recalcBV(time mclock.AbsTime) { dt := uint64(time - peer.lastTime) if time < peer.lastTime { dt = 0 } - peer.bufValue += peer.params.MinRecharge * dt / fcTimeConst + peer.bufValue += peer.params.MinRecharge * dt / uint64(fcTimeConst) if peer.bufValue > peer.params.BufLimit { peer.bufValue = peer.params.BufLimit } @@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) { peer.lock.Lock() defer peer.lock.Unlock() - time := getTime() + time := mclock.Now() peer.recalcBV(time) return peer.bufValue, peer.cm.accept(peer.cmNode, time) } @@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - time := getTime() + time := mclock.Now() peer.recalcBV(time) peer.bufValue -= cost peer.recalcBV(time) @@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { } type ServerNode struct { - bufEstimate uint64 - lastTime int64 - params *ServerParams - sumCost uint64 // sum of req costs sent to this server - pending map[uint64]uint64 // value = sumCost after sending the given req - lock sync.RWMutex + bufEstimate uint64 + lastTime mclock.AbsTime + params *ServerParams + sumCost uint64 // sum of req costs sent to this server + pending map[uint64]uint64 // value = sumCost after sending the given req + assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer + assignToken chan struct{} // send to this channel before assigning, read from it after deassigning + lock sync.RWMutex } func NewServerNode(params *ServerParams) *ServerNode { return &ServerNode{ bufEstimate: params.BufLimit, - lastTime: getTime(), + lastTime: mclock.Now(), params: params, pending: make(map[uint64]uint64), + assignToken: make(chan struct{}, 1), } } -func getTime() int64 { - return int64(mclock.Now()) -} - -func (peer *ServerNode) recalcBLE(time int64) { +func (peer *ServerNode) recalcBLE(time mclock.AbsTime) { dt := uint64(time - peer.lastTime) if time < peer.lastTime { dt = 0 } - peer.bufEstimate += peer.params.MinRecharge * dt / fcTimeConst + peer.bufEstimate += peer.params.MinRecharge * dt / uint64(fcTimeConst) if peer.bufEstimate > peer.params.BufLimit { peer.bufEstimate = peer.params.BufLimit } peer.lastTime = time } -func (peer *ServerNode) canSend(maxCost uint64) uint64 { +// safetyMargin is added to the flow control waiting time when estimated buffer value is low +const safetyMargin = time.Millisecond * 200 + +func (peer *ServerNode) canSend(maxCost uint64) time.Duration { + maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) + if maxCost > peer.params.BufLimit { + maxCost = peer.params.BufLimit + } if peer.bufEstimate >= maxCost { return 0 } - return (maxCost - peer.bufEstimate) * fcTimeConst / peer.params.MinRecharge + return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge) } -func (peer *ServerNode) CanSend(maxCost uint64) uint64 { +// CanSend returns the minimum waiting time required before sending a request +// with the given maximum estimated cost +func (peer *ServerNode) CanSend(maxCost uint64) time.Duration { peer.lock.RLock() defer peer.lock.RUnlock() return peer.canSend(maxCost) } +// AssignRequest tries to assign the server node to the given request, guaranteeing +// that once it returns true, no request will be sent to the node before this one +func (peer *ServerNode) AssignRequest(reqID uint64) bool { + select { + case peer.assignToken <- struct{}{}: + default: + return false + } + peer.lock.Lock() + peer.assignedRequest = reqID + peer.lock.Unlock() + return true +} + +// MustAssignRequest waits until the node can be assigned to the given request. +// It is always guaranteed that assignments are released in a short amount of time. +func (peer *ServerNode) MustAssignRequest(reqID uint64) { + peer.assignToken <- struct{}{} + peer.lock.Lock() + peer.assignedRequest = reqID + peer.lock.Unlock() +} + +// DeassignRequest releases a request assignment in case the planned request +// is not being sent. +func (peer *ServerNode) DeassignRequest(reqID uint64) { + peer.lock.Lock() + if peer.assignedRequest == reqID { + peer.assignedRequest = 0 + <-peer.assignToken + } + peer.lock.Unlock() +} + +// IsAssigned returns true if the server node has already been assigned to a request +// (note that this function returning false does not guarantee that you can assign a request +// immediately afterwards, its only purpose is to help peer selection) +func (peer *ServerNode) IsAssigned() bool { + peer.lock.RLock() + locked := peer.assignedRequest != 0 + peer.lock.RUnlock() + return locked +} + // blocks until request can be sent func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - peer.recalcBLE(getTime()) - for peer.bufEstimate < maxCost { - wait := time.Duration(peer.canSend(maxCost)) + if peer.assignedRequest != reqID { + peer.lock.Unlock() + peer.MustAssignRequest(reqID) + peer.lock.Lock() + } + + peer.recalcBLE(mclock.Now()) + wait := peer.canSend(maxCost) + for wait > 0 { peer.lock.Unlock() time.Sleep(wait) peer.lock.Lock() - peer.recalcBLE(getTime()) + peer.recalcBLE(mclock.Now()) + wait = peer.canSend(maxCost) } + peer.assignedRequest = 0 + <-peer.assignToken peer.bufEstimate -= maxCost peer.sumCost += maxCost if reqID >= 0 { @@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { } func (peer *ServerNode) GotReply(reqID, bv uint64) { + peer.lock.Lock() defer peer.lock.Unlock() + if bv > peer.params.BufLimit { + bv = peer.params.BufLimit + } sc, ok := peer.pending[reqID] if !ok { return } delete(peer.pending, reqID) peer.bufEstimate = bv - (peer.sumCost - sc) - peer.lastTime = getTime() + peer.lastTime = mclock.Now() } |