aboutsummaryrefslogtreecommitdiffstats
path: root/les
diff options
context:
space:
mode:
Diffstat (limited to 'les')
-rw-r--r--les/freeclient.go278
-rw-r--r--les/freeclient_test.go139
-rw-r--r--les/handler.go22
3 files changed, 438 insertions, 1 deletions
diff --git a/les/freeclient.go b/les/freeclient.go
new file mode 100644
index 000000000..5ee607be8
--- /dev/null
+++ b/les/freeclient.go
@@ -0,0 +1,278 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package les implements the Light Ethereum Subprotocol.
+package les
+
+import (
+ "io"
+ "math"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/common/prque"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// freeClientPool implements a client database that limits the connection time
+// of each client and manages accepting/rejecting incoming connections and even
+// kicking out some connected clients. The pool calculates recent usage time
+// for each known client (a value that increases linearly when the client is
+// connected and decreases exponentially when not connected). Clients with lower
+// recent usage are preferred, unknown nodes have the highest priority. Already
+// connected nodes receive a small bias in their favor in order to avoid accepting
+// and instantly kicking out clients.
+//
+// Note: the pool can use any string for client identification. Using signature
+// keys for that purpose would not make sense when being known has a negative
+// value for the client. Currently the LES protocol manager uses IP addresses
+// (without port address) to identify clients.
+type freeClientPool struct {
+ db ethdb.Database
+ lock sync.Mutex
+ clock mclock.Clock
+ closed bool
+
+ connectedLimit, totalLimit int
+
+ addressMap map[string]*freeClientPoolEntry
+ connPool, disconnPool *prque.Prque
+ startupTime mclock.AbsTime
+ logOffsetAtStartup int64
+}
+
+const (
+ recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage
+ fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
+ connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
+)
+
+// newFreeClientPool creates a new free client pool
+func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool {
+ pool := &freeClientPool{
+ db: db,
+ clock: clock,
+ addressMap: make(map[string]*freeClientPoolEntry),
+ connPool: prque.New(poolSetIndex),
+ disconnPool: prque.New(poolSetIndex),
+ connectedLimit: connectedLimit,
+ totalLimit: totalLimit,
+ }
+ pool.loadFromDb()
+ return pool
+}
+
+func (f *freeClientPool) stop() {
+ f.lock.Lock()
+ f.closed = true
+ f.saveToDb()
+ f.lock.Unlock()
+}
+
+// connect should be called after a successful handshake. If the connection was
+// rejected, there is no need to call disconnect.
+//
+// Note: the disconnectFn callback should not block.
+func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.closed {
+ return false
+ }
+ e := f.addressMap[address]
+ now := f.clock.Now()
+ var recentUsage int64
+ if e == nil {
+ e = &freeClientPoolEntry{address: address, index: -1}
+ f.addressMap[address] = e
+ } else {
+ if e.connected {
+ log.Debug("Client already connected", "address", address)
+ return false
+ }
+ recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier))
+ }
+ e.linUsage = recentUsage - int64(now)
+ // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool
+ if f.connPool.Size() == f.connectedLimit {
+ i := f.connPool.PopItem().(*freeClientPoolEntry)
+ if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
+ // kick it out and accept the new client
+ f.connPool.Remove(i.index)
+ f.calcLogUsage(i, now)
+ i.connected = false
+ f.disconnPool.Push(i, -i.logUsage)
+ log.Debug("Client kicked out", "address", i.address)
+ i.disconnectFn()
+ } else {
+ // keep the old client and reject the new one
+ f.connPool.Push(i, i.linUsage)
+ log.Debug("Client rejected", "address", address)
+ return false
+ }
+ }
+ f.disconnPool.Remove(e.index)
+ e.connected = true
+ e.disconnectFn = disconnectFn
+ f.connPool.Push(e, e.linUsage)
+ if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
+ f.disconnPool.Pop()
+ }
+ log.Debug("Client accepted", "address", address)
+ return true
+}
+
+// disconnect should be called when a connection is terminated. If the disconnection
+// was initiated by the pool itself using disconnectFn then calling disconnect is
+// not necessary but permitted.
+func (f *freeClientPool) disconnect(address string) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.closed {
+ return
+ }
+ e := f.addressMap[address]
+ now := f.clock.Now()
+ if !e.connected {
+ log.Debug("Client already disconnected", "address", address)
+ return
+ }
+
+ f.connPool.Remove(e.index)
+ f.calcLogUsage(e, now)
+ e.connected = false
+ f.disconnPool.Push(e, -e.logUsage)
+ log.Debug("Client disconnected", "address", address)
+}
+
+// logOffset calculates the time-dependent offset for the logarithmic
+// representation of recent usage
+func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
+ // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor
+ // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier.
+ logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier))
+ return f.logOffsetAtStartup + logDecay
+}
+
+// calcLogUsage converts recent usage from linear to logarithmic representation
+// when disconnecting a peer or closing the client pool
+func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) {
+ dt := e.linUsage + int64(now)
+ if dt < 1 {
+ dt = 1
+ }
+ e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now)
+}
+
+// freeClientPoolStorage is the RLP representation of the pool's database storage
+type freeClientPoolStorage struct {
+ LogOffset uint64
+ List []*freeClientPoolEntry
+}
+
+// loadFromDb restores pool status from the database storage
+// (automatically called at initialization)
+func (f *freeClientPool) loadFromDb() {
+ enc, err := f.db.Get([]byte("freeClientPool"))
+ if err != nil {
+ return
+ }
+ var storage freeClientPoolStorage
+ err = rlp.DecodeBytes(enc, &storage)
+ if err != nil {
+ log.Error("Failed to decode client list", "err", err)
+ return
+ }
+ f.logOffsetAtStartup = int64(storage.LogOffset)
+ f.startupTime = f.clock.Now()
+ for _, e := range storage.List {
+ log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage)
+ f.addressMap[e.address] = e
+ f.disconnPool.Push(e, -e.logUsage)
+ }
+}
+
+// saveToDb saves pool status to the database storage
+// (automatically called during shutdown)
+func (f *freeClientPool) saveToDb() {
+ now := f.clock.Now()
+ storage := freeClientPoolStorage{
+ LogOffset: uint64(f.logOffset(now)),
+ List: make([]*freeClientPoolEntry, len(f.addressMap)),
+ }
+ i := 0
+ for _, e := range f.addressMap {
+ if e.connected {
+ f.calcLogUsage(e, now)
+ }
+ storage.List[i] = e
+ i++
+ }
+ enc, err := rlp.EncodeToBytes(storage)
+ if err != nil {
+ log.Error("Failed to encode client list", "err", err)
+ } else {
+ f.db.Put([]byte("freeClientPool"), enc)
+ }
+}
+
+// freeClientPoolEntry represents a client address known by the pool.
+// When connected, recent usage is calculated as linUsage + int64(clock.Now())
+// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset
+// also grows linearly with time while the server is running.
+// Conversion between linear and logarithmic representation happens when connecting
+// or disconnecting the node.
+//
+// Note: linUsage and logUsage are values used with constantly growing offsets so
+// even though they are close to each other at any time they may wrap around int64
+// limits over time. Comparison should be performed accordingly.
+type freeClientPoolEntry struct {
+ address string
+ connected bool
+ disconnectFn func()
+ linUsage, logUsage int64
+ index int
+}
+
+func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)})
+}
+
+func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error {
+ var entry struct {
+ Address string
+ LogUsage uint64
+ }
+ if err := s.Decode(&entry); err != nil {
+ return err
+ }
+ e.address = entry.Address
+ e.logUsage = int64(entry.LogUsage)
+ e.connected = false
+ e.index = -1
+ return nil
+}
+
+// poolSetIndex callback is used by both priority queues to set/update the index of
+// the element in the queue. Index is needed to remove elements other than the top one.
+func poolSetIndex(a interface{}, i int) {
+ a.(*freeClientPoolEntry).index = i
+}
diff --git a/les/freeclient_test.go b/les/freeclient_test.go
new file mode 100644
index 000000000..e95abc7aa
--- /dev/null
+++ b/les/freeclient_test.go
@@ -0,0 +1,139 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/ethdb"
+)
+
+func TestFreeClientPoolL10C100(t *testing.T) {
+ testFreeClientPool(t, 10, 100)
+}
+
+func TestFreeClientPoolL40C200(t *testing.T) {
+ testFreeClientPool(t, 40, 200)
+}
+
+func TestFreeClientPoolL100C300(t *testing.T) {
+ testFreeClientPool(t, 100, 300)
+}
+
+const testFreeClientPoolTicks = 500000
+
+func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
+ var (
+ clock mclock.Simulated
+ db = ethdb.NewMemDatabase()
+ pool = newFreeClientPool(db, connLimit, 10000, &clock)
+ connected = make([]bool, clientCount)
+ connTicks = make([]int, clientCount)
+ disconnCh = make(chan int, clientCount)
+ )
+ peerId := func(i int) string {
+ return fmt.Sprintf("test peer #%d", i)
+ }
+ disconnFn := func(i int) func() {
+ return func() {
+ disconnCh <- i
+ }
+ }
+
+ // pool should accept new peers up to its connected limit
+ for i := 0; i < connLimit; i++ {
+ if pool.connect(peerId(i), disconnFn(i)) {
+ connected[i] = true
+ } else {
+ t.Fatalf("Test peer #%d rejected", i)
+ }
+ }
+ // since all accepted peers are new and should not be kicked out, the next one should be rejected
+ if pool.connect(peerId(connLimit), disconnFn(connLimit)) {
+ connected[connLimit] = true
+ t.Fatalf("Peer accepted over connected limit")
+ }
+
+ // randomly connect and disconnect peers, expect to have a similar total connection time at the end
+ for tickCounter := 0; tickCounter < testFreeClientPoolTicks; tickCounter++ {
+ clock.Run(1 * time.Second)
+
+ i := rand.Intn(clientCount)
+ if connected[i] {
+ pool.disconnect(peerId(i))
+ connected[i] = false
+ connTicks[i] += tickCounter
+ } else {
+ if pool.connect(peerId(i), disconnFn(i)) {
+ connected[i] = true
+ connTicks[i] -= tickCounter
+ }
+ }
+ pollDisconnects:
+ for {
+ select {
+ case i := <-disconnCh:
+ pool.disconnect(peerId(i))
+ if connected[i] {
+ connTicks[i] += tickCounter
+ connected[i] = false
+ }
+ default:
+ break pollDisconnects
+ }
+ }
+ }
+
+ expTicks := testFreeClientPoolTicks * connLimit / clientCount
+ expMin := expTicks - expTicks/10
+ expMax := expTicks + expTicks/10
+
+ // check if the total connected time of peers are all in the expected range
+ for i, c := range connected {
+ if c {
+ connTicks[i] += testFreeClientPoolTicks
+ }
+ if connTicks[i] < expMin || connTicks[i] > expMax {
+ t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], expMin, expMax)
+ }
+ }
+
+ // a previously unknown peer should be accepted now
+ if !pool.connect("newPeer", func() {}) {
+ t.Fatalf("Previously unknown peer rejected")
+ }
+
+ // close and restart pool
+ pool.stop()
+ pool = newFreeClientPool(db, connLimit, 10000, &clock)
+
+ // try connecting all known peers (connLimit should be filled up)
+ for i := 0; i < clientCount; i++ {
+ pool.connect(peerId(i), func() {})
+ }
+ // expect pool to remember known nodes and kick out one of them to accept a new one
+ if !pool.connect("newPeer2", func() {}) {
+ t.Errorf("Previously unknown peer rejected after restarting pool")
+ }
+ pool.stop()
+}
diff --git a/les/handler.go b/les/handler.go
index 2fc4cde34..91a235bf0 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -28,6 +28,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -104,6 +105,7 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ clientPool *freeClientPool
lesTopic discv5.Topic
reqDist *requestDistributor
retriever *retrieveManager
@@ -226,6 +228,7 @@ func (pm *ProtocolManager) Start(maxPeers int) {
if pm.lightSync {
go pm.syncer()
} else {
+ pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{})
go func() {
for range pm.newPeerCh {
}
@@ -243,6 +246,9 @@ func (pm *ProtocolManager) Stop() {
pm.noMorePeers <- struct{}{}
close(pm.quitSync) // quits syncer, fetcher
+ if pm.clientPool != nil {
+ pm.clientPool.stop()
+ }
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
@@ -264,7 +270,8 @@ func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgRea
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
- if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
+ // In server mode we try to check into the client pool after handshake
+ if pm.lightSync && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
@@ -282,6 +289,19 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
+
+ if !pm.lightSync && !p.Peer.Info().Network.Trusted {
+ addr, ok := p.RemoteAddr().(*net.TCPAddr)
+ // test peer address is not a tcp address, don't use client pool if can not typecast
+ if ok {
+ id := addr.IP.String()
+ if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) {
+ return p2p.DiscTooManyPeers
+ }
+ defer pm.clientPool.disconnect(id)
+ }
+ }
+
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}