diff options
Diffstat (limited to 'les/freeclient.go')
-rw-r--r-- | les/freeclient.go | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/les/freeclient.go b/les/freeclient.go new file mode 100644 index 000000000..5ee607be8 --- /dev/null +++ b/les/freeclient.go @@ -0,0 +1,278 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "io" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// freeClientPool implements a client database that limits the connection time +// of each client and manages accepting/rejecting incoming connections and even +// kicking out some connected clients. The pool calculates recent usage time +// for each known client (a value that increases linearly when the client is +// connected and decreases exponentially when not connected). Clients with lower +// recent usage are preferred, unknown nodes have the highest priority. Already +// connected nodes receive a small bias in their favor in order to avoid accepting +// and instantly kicking out clients. +// +// Note: the pool can use any string for client identification. Using signature +// keys for that purpose would not make sense when being known has a negative +// value for the client. Currently the LES protocol manager uses IP addresses +// (without port address) to identify clients. +type freeClientPool struct { + db ethdb.Database + lock sync.Mutex + clock mclock.Clock + closed bool + + connectedLimit, totalLimit int + + addressMap map[string]*freeClientPoolEntry + connPool, disconnPool *prque.Prque + startupTime mclock.AbsTime + logOffsetAtStartup int64 +} + +const ( + recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage + fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format + connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon +) + +// newFreeClientPool creates a new free client pool +func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool { + pool := &freeClientPool{ + db: db, + clock: clock, + addressMap: make(map[string]*freeClientPoolEntry), + connPool: prque.New(poolSetIndex), + disconnPool: prque.New(poolSetIndex), + connectedLimit: connectedLimit, + totalLimit: totalLimit, + } + pool.loadFromDb() + return pool +} + +func (f *freeClientPool) stop() { + f.lock.Lock() + f.closed = true + f.saveToDb() + f.lock.Unlock() +} + +// connect should be called after a successful handshake. If the connection was +// rejected, there is no need to call disconnect. +// +// Note: the disconnectFn callback should not block. +func (f *freeClientPool) connect(address string, disconnectFn func()) bool { + f.lock.Lock() + defer f.lock.Unlock() + + if f.closed { + return false + } + e := f.addressMap[address] + now := f.clock.Now() + var recentUsage int64 + if e == nil { + e = &freeClientPoolEntry{address: address, index: -1} + f.addressMap[address] = e + } else { + if e.connected { + log.Debug("Client already connected", "address", address) + return false + } + recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier)) + } + e.linUsage = recentUsage - int64(now) + // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool + if f.connPool.Size() == f.connectedLimit { + i := f.connPool.PopItem().(*freeClientPoolEntry) + if e.linUsage+int64(connectedBias)-i.linUsage < 0 { + // kick it out and accept the new client + f.connPool.Remove(i.index) + f.calcLogUsage(i, now) + i.connected = false + f.disconnPool.Push(i, -i.logUsage) + log.Debug("Client kicked out", "address", i.address) + i.disconnectFn() + } else { + // keep the old client and reject the new one + f.connPool.Push(i, i.linUsage) + log.Debug("Client rejected", "address", address) + return false + } + } + f.disconnPool.Remove(e.index) + e.connected = true + e.disconnectFn = disconnectFn + f.connPool.Push(e, e.linUsage) + if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { + f.disconnPool.Pop() + } + log.Debug("Client accepted", "address", address) + return true +} + +// disconnect should be called when a connection is terminated. If the disconnection +// was initiated by the pool itself using disconnectFn then calling disconnect is +// not necessary but permitted. +func (f *freeClientPool) disconnect(address string) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.closed { + return + } + e := f.addressMap[address] + now := f.clock.Now() + if !e.connected { + log.Debug("Client already disconnected", "address", address) + return + } + + f.connPool.Remove(e.index) + f.calcLogUsage(e, now) + e.connected = false + f.disconnPool.Push(e, -e.logUsage) + log.Debug("Client disconnected", "address", address) +} + +// logOffset calculates the time-dependent offset for the logarithmic +// representation of recent usage +func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 { + // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor + // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier. + logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier)) + return f.logOffsetAtStartup + logDecay +} + +// calcLogUsage converts recent usage from linear to logarithmic representation +// when disconnecting a peer or closing the client pool +func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) { + dt := e.linUsage + int64(now) + if dt < 1 { + dt = 1 + } + e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now) +} + +// freeClientPoolStorage is the RLP representation of the pool's database storage +type freeClientPoolStorage struct { + LogOffset uint64 + List []*freeClientPoolEntry +} + +// loadFromDb restores pool status from the database storage +// (automatically called at initialization) +func (f *freeClientPool) loadFromDb() { + enc, err := f.db.Get([]byte("freeClientPool")) + if err != nil { + return + } + var storage freeClientPoolStorage + err = rlp.DecodeBytes(enc, &storage) + if err != nil { + log.Error("Failed to decode client list", "err", err) + return + } + f.logOffsetAtStartup = int64(storage.LogOffset) + f.startupTime = f.clock.Now() + for _, e := range storage.List { + log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage) + f.addressMap[e.address] = e + f.disconnPool.Push(e, -e.logUsage) + } +} + +// saveToDb saves pool status to the database storage +// (automatically called during shutdown) +func (f *freeClientPool) saveToDb() { + now := f.clock.Now() + storage := freeClientPoolStorage{ + LogOffset: uint64(f.logOffset(now)), + List: make([]*freeClientPoolEntry, len(f.addressMap)), + } + i := 0 + for _, e := range f.addressMap { + if e.connected { + f.calcLogUsage(e, now) + } + storage.List[i] = e + i++ + } + enc, err := rlp.EncodeToBytes(storage) + if err != nil { + log.Error("Failed to encode client list", "err", err) + } else { + f.db.Put([]byte("freeClientPool"), enc) + } +} + +// freeClientPoolEntry represents a client address known by the pool. +// When connected, recent usage is calculated as linUsage + int64(clock.Now()) +// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset +// also grows linearly with time while the server is running. +// Conversion between linear and logarithmic representation happens when connecting +// or disconnecting the node. +// +// Note: linUsage and logUsage are values used with constantly growing offsets so +// even though they are close to each other at any time they may wrap around int64 +// limits over time. Comparison should be performed accordingly. +type freeClientPoolEntry struct { + address string + connected bool + disconnectFn func() + linUsage, logUsage int64 + index int +} + +func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)}) +} + +func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error { + var entry struct { + Address string + LogUsage uint64 + } + if err := s.Decode(&entry); err != nil { + return err + } + e.address = entry.Address + e.logUsage = int64(entry.LogUsage) + e.connected = false + e.index = -1 + return nil +} + +// poolSetIndex callback is used by both priority queues to set/update the index of +// the element in the queue. Index is needed to remove elements other than the top one. +func poolSetIndex(a interface{}, i int) { + a.(*freeClientPoolEntry).index = i +} |