aboutsummaryrefslogtreecommitdiffstats
path: root/les/freeclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/freeclient.go')
-rw-r--r--les/freeclient.go278
1 files changed, 278 insertions, 0 deletions
diff --git a/les/freeclient.go b/les/freeclient.go
new file mode 100644
index 000000000..5ee607be8
--- /dev/null
+++ b/les/freeclient.go
@@ -0,0 +1,278 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package les implements the Light Ethereum Subprotocol.
+package les
+
+import (
+ "io"
+ "math"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// freeClientPool implements a client database that limits the connection time
+// of each client and manages accepting/rejecting incoming connections and even
+// kicking out some connected clients. The pool calculates recent usage time
+// for each known client (a value that increases linearly when the client is
+// connected and decreases exponentially when not connected). Clients with lower
+// recent usage are preferred, unknown nodes have the highest priority. Already
+// connected nodes receive a small bias in their favor in order to avoid accepting
+// and instantly kicking out clients.
+//
+// Note: the pool can use any string for client identification. Using signature
+// keys for that purpose would not make sense when being known has a negative
+// value for the client. Currently the LES protocol manager uses IP addresses
+// (without port address) to identify clients.
+type freeClientPool struct {
+ db ethdb.Database
+ lock sync.Mutex
+ clock mclock.Clock
+ closed bool
+
+ connectedLimit, totalLimit int
+
+ addressMap map[string]*freeClientPoolEntry
+ connPool, disconnPool *prque.Prque
+ startupTime mclock.AbsTime
+ logOffsetAtStartup int64
+}
+
+const (
+ recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage
+ fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
+ connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
+)
+
+// newFreeClientPool creates a new free client pool
+func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool {
+ pool := &freeClientPool{
+ db: db,
+ clock: clock,
+ addressMap: make(map[string]*freeClientPoolEntry),
+ connPool: prque.New(poolSetIndex),
+ disconnPool: prque.New(poolSetIndex),
+ connectedLimit: connectedLimit,
+ totalLimit: totalLimit,
+ }
+ pool.loadFromDb()
+ return pool
+}
+
+func (f *freeClientPool) stop() {
+ f.lock.Lock()
+ f.closed = true
+ f.saveToDb()
+ f.lock.Unlock()
+}
+
+// connect should be called after a successful handshake. If the connection was
+// rejected, there is no need to call disconnect.
+//
+// Note: the disconnectFn callback should not block.
+func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.closed {
+ return false
+ }
+ e := f.addressMap[address]
+ now := f.clock.Now()
+ var recentUsage int64
+ if e == nil {
+ e = &freeClientPoolEntry{address: address, index: -1}
+ f.addressMap[address] = e
+ } else {
+ if e.connected {
+ log.Debug("Client already connected", "address", address)
+ return false
+ }
+ recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier))
+ }
+ e.linUsage = recentUsage - int64(now)
+ // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool
+ if f.connPool.Size() == f.connectedLimit {
+ i := f.connPool.PopItem().(*freeClientPoolEntry)
+ if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
+ // kick it out and accept the new client
+ f.connPool.Remove(i.index)
+ f.calcLogUsage(i, now)
+ i.connected = false
+ f.disconnPool.Push(i, -i.logUsage)
+ log.Debug("Client kicked out", "address", i.address)
+ i.disconnectFn()
+ } else {
+ // keep the old client and reject the new one
+ f.connPool.Push(i, i.linUsage)
+ log.Debug("Client rejected", "address", address)
+ return false
+ }
+ }
+ f.disconnPool.Remove(e.index)
+ e.connected = true
+ e.disconnectFn = disconnectFn
+ f.connPool.Push(e, e.linUsage)
+ if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
+ f.disconnPool.Pop()
+ }
+ log.Debug("Client accepted", "address", address)
+ return true
+}
+
+// disconnect should be called when a connection is terminated. If the disconnection
+// was initiated by the pool itself using disconnectFn then calling disconnect is
+// not necessary but permitted.
+func (f *freeClientPool) disconnect(address string) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.closed {
+ return
+ }
+ e := f.addressMap[address]
+ now := f.clock.Now()
+ if !e.connected {
+ log.Debug("Client already disconnected", "address", address)
+ return
+ }
+
+ f.connPool.Remove(e.index)
+ f.calcLogUsage(e, now)
+ e.connected = false
+ f.disconnPool.Push(e, -e.logUsage)
+ log.Debug("Client disconnected", "address", address)
+}
+
+// logOffset calculates the time-dependent offset for the logarithmic
+// representation of recent usage
+func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
+ // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor
+ // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier.
+ logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier))
+ return f.logOffsetAtStartup + logDecay
+}
+
+// calcLogUsage converts recent usage from linear to logarithmic representation
+// when disconnecting a peer or closing the client pool
+func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) {
+ dt := e.linUsage + int64(now)
+ if dt < 1 {
+ dt = 1
+ }
+ e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now)
+}
+
+// freeClientPoolStorage is the RLP representation of the pool's database storage
+type freeClientPoolStorage struct {
+ LogOffset uint64
+ List []*freeClientPoolEntry
+}
+
+// loadFromDb restores pool status from the database storage
+// (automatically called at initialization)
+func (f *freeClientPool) loadFromDb() {
+ enc, err := f.db.Get([]byte("freeClientPool"))
+ if err != nil {
+ return
+ }
+ var storage freeClientPoolStorage
+ err = rlp.DecodeBytes(enc, &storage)
+ if err != nil {
+ log.Error("Failed to decode client list", "err", err)
+ return
+ }
+ f.logOffsetAtStartup = int64(storage.LogOffset)
+ f.startupTime = f.clock.Now()
+ for _, e := range storage.List {
+ log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage)
+ f.addressMap[e.address] = e
+ f.disconnPool.Push(e, -e.logUsage)
+ }
+}
+
+// saveToDb saves pool status to the database storage
+// (automatically called during shutdown)
+func (f *freeClientPool) saveToDb() {
+ now := f.clock.Now()
+ storage := freeClientPoolStorage{
+ LogOffset: uint64(f.logOffset(now)),
+ List: make([]*freeClientPoolEntry, len(f.addressMap)),
+ }
+ i := 0
+ for _, e := range f.addressMap {
+ if e.connected {
+ f.calcLogUsage(e, now)
+ }
+ storage.List[i] = e
+ i++
+ }
+ enc, err := rlp.EncodeToBytes(storage)
+ if err != nil {
+ log.Error("Failed to encode client list", "err", err)
+ } else {
+ f.db.Put([]byte("freeClientPool"), enc)
+ }
+}
+
+// freeClientPoolEntry represents a client address known by the pool.
+// When connected, recent usage is calculated as linUsage + int64(clock.Now())
+// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset
+// also grows linearly with time while the server is running.
+// Conversion between linear and logarithmic representation happens when connecting
+// or disconnecting the node.
+//
+// Note: linUsage and logUsage are values used with constantly growing offsets so
+// even though they are close to each other at any time they may wrap around int64
+// limits over time. Comparison should be performed accordingly.
+type freeClientPoolEntry struct {
+ address string
+ connected bool
+ disconnectFn func()
+ linUsage, logUsage int64
+ index int
+}
+
+func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)})
+}
+
+func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error {
+ var entry struct {
+ Address string
+ LogUsage uint64
+ }
+ if err := s.Decode(&entry); err != nil {
+ return err
+ }
+ e.address = entry.Address
+ e.logUsage = int64(entry.LogUsage)
+ e.connected = false
+ e.index = -1
+ return nil
+}
+
+// poolSetIndex callback is used by both priority queues to set/update the index of
+// the element in the queue. Index is needed to remove elements other than the top one.
+func poolSetIndex(a interface{}, i int) {
+ a.(*freeClientPoolEntry).index = i
+}