diff options
-rw-r--r-- | common/mclock/mclock.go | 31 | ||||
-rw-r--r-- | common/mclock/simclock.go | 129 | ||||
-rwxr-xr-x | common/prque/prque.go | 57 | ||||
-rwxr-xr-x | common/prque/sstack.go | 106 | ||||
-rw-r--r-- | les/freeclient.go | 278 | ||||
-rw-r--r-- | les/freeclient_test.go | 139 | ||||
-rw-r--r-- | les/handler.go | 22 |
7 files changed, 761 insertions, 1 deletions
diff --git a/common/mclock/mclock.go b/common/mclock/mclock.go index 02608d17b..dcac59c6c 100644 --- a/common/mclock/mclock.go +++ b/common/mclock/mclock.go @@ -30,3 +30,34 @@ type AbsTime time.Duration func Now() AbsTime { return AbsTime(monotime.Now()) } + +// Add returns t + d. +func (t AbsTime) Add(d time.Duration) AbsTime { + return t + AbsTime(d) +} + +// Clock interface makes it possible to replace the monotonic system clock with +// a simulated clock. +type Clock interface { + Now() AbsTime + Sleep(time.Duration) + After(time.Duration) <-chan time.Time +} + +// System implements Clock using the system clock. +type System struct{} + +// Now implements Clock. +func (System) Now() AbsTime { + return AbsTime(monotime.Now()) +} + +// Sleep implements Clock. +func (System) Sleep(d time.Duration) { + time.Sleep(d) +} + +// After implements Clock. +func (System) After(d time.Duration) <-chan time.Time { + return time.After(d) +} diff --git a/common/mclock/simclock.go b/common/mclock/simclock.go new file mode 100644 index 000000000..e014f5615 --- /dev/null +++ b/common/mclock/simclock.go @@ -0,0 +1,129 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package mclock + +import ( + "sync" + "time" +) + +// Simulated implements a virtual Clock for reproducible time-sensitive tests. It +// simulates a scheduler on a virtual timescale where actual processing takes zero time. +// +// The virtual clock doesn't advance on its own, call Run to advance it and execute timers. +// Since there is no way to influence the Go scheduler, testing timeout behaviour involving +// goroutines needs special care. A good way to test such timeouts is as follows: First +// perform the action that is supposed to time out. Ensure that the timer you want to test +// is created. Then run the clock until after the timeout. Finally observe the effect of +// the timeout using a channel or semaphore. +type Simulated struct { + now AbsTime + scheduled []event + mu sync.RWMutex + cond *sync.Cond +} + +type event struct { + do func() + at AbsTime +} + +// Run moves the clock by the given duration, executing all timers before that duration. +func (s *Simulated) Run(d time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + s.init() + + end := s.now + AbsTime(d) + for len(s.scheduled) > 0 { + ev := s.scheduled[0] + if ev.at > end { + break + } + s.now = ev.at + ev.do() + s.scheduled = s.scheduled[1:] + } + s.now = end +} + +func (s *Simulated) ActiveTimers() int { + s.mu.RLock() + defer s.mu.RUnlock() + + return len(s.scheduled) +} + +func (s *Simulated) WaitForTimers(n int) { + s.mu.Lock() + defer s.mu.Unlock() + s.init() + + for len(s.scheduled) < n { + s.cond.Wait() + } +} + +// Now implements Clock. +func (s *Simulated) Now() AbsTime { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.now +} + +// Sleep implements Clock. +func (s *Simulated) Sleep(d time.Duration) { + <-s.After(d) +} + +// After implements Clock. +func (s *Simulated) After(d time.Duration) <-chan time.Time { + after := make(chan time.Time, 1) + s.insert(d, func() { + after <- (time.Time{}).Add(time.Duration(s.now)) + }) + return after +} + +func (s *Simulated) insert(d time.Duration, do func()) { + s.mu.Lock() + defer s.mu.Unlock() + s.init() + + at := s.now + AbsTime(d) + l, h := 0, len(s.scheduled) + ll := h + for l != h { + m := (l + h) / 2 + if at < s.scheduled[m].at { + h = m + } else { + l = m + 1 + } + } + s.scheduled = append(s.scheduled, event{}) + copy(s.scheduled[l+1:], s.scheduled[l:ll]) + s.scheduled[l] = event{do: do, at: at} + s.cond.Broadcast() +} + +func (s *Simulated) init() { + if s.cond == nil { + s.cond = sync.NewCond(&s.mu) + } +} diff --git a/common/prque/prque.go b/common/prque/prque.go new file mode 100755 index 000000000..9fd31a2e5 --- /dev/null +++ b/common/prque/prque.go @@ -0,0 +1,57 @@ +// This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque". + +package prque + +import ( + "container/heap" +) + +// Priority queue data structure. +type Prque struct { + cont *sstack +} + +// Creates a new priority queue. +func New(setIndex setIndexCallback) *Prque { + return &Prque{newSstack(setIndex)} +} + +// Pushes a value with a given priority into the queue, expanding if necessary. +func (p *Prque) Push(data interface{}, priority int64) { + heap.Push(p.cont, &item{data, priority}) +} + +// Pops the value with the greates priority off the stack and returns it. +// Currently no shrinking is done. +func (p *Prque) Pop() (interface{}, int64) { + item := heap.Pop(p.cont).(*item) + return item.value, item.priority +} + +// Pops only the item from the queue, dropping the associated priority value. +func (p *Prque) PopItem() interface{} { + return heap.Pop(p.cont).(*item).value +} + +// Remove removes the element with the given index. +func (p *Prque) Remove(i int) interface{} { + if i < 0 { + return nil + } + return heap.Remove(p.cont, i) +} + +// Checks whether the priority queue is empty. +func (p *Prque) Empty() bool { + return p.cont.Len() == 0 +} + +// Returns the number of element in the priority queue. +func (p *Prque) Size() int { + return p.cont.Len() +} + +// Clears the contents of the priority queue. +func (p *Prque) Reset() { + *p = *New(p.cont.setIndex) +} diff --git a/common/prque/sstack.go b/common/prque/sstack.go new file mode 100755 index 000000000..4875dae99 --- /dev/null +++ b/common/prque/sstack.go @@ -0,0 +1,106 @@ +// This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque". + +package prque + +// The size of a block of data +const blockSize = 4096 + +// A prioritized item in the sorted stack. +// +// Note: priorities can "wrap around" the int64 range, a comes before b if (a.priority - b.priority) > 0. +// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63. +type item struct { + value interface{} + priority int64 +} + +// setIndexCallback is called when the element is moved to a new index. +// Providing setIndexCallback is optional, it is needed only if the application needs +// to delete elements other than the top one. +type setIndexCallback func(a interface{}, i int) + +// Internal sortable stack data structure. Implements the Push and Pop ops for +// the stack (heap) functionality and the Len, Less and Swap methods for the +// sortability requirements of the heaps. +type sstack struct { + setIndex setIndexCallback + size int + capacity int + offset int + + blocks [][]*item + active []*item +} + +// Creates a new, empty stack. +func newSstack(setIndex setIndexCallback) *sstack { + result := new(sstack) + result.setIndex = setIndex + result.active = make([]*item, blockSize) + result.blocks = [][]*item{result.active} + result.capacity = blockSize + return result +} + +// Pushes a value onto the stack, expanding it if necessary. Required by +// heap.Interface. +func (s *sstack) Push(data interface{}) { + if s.size == s.capacity { + s.active = make([]*item, blockSize) + s.blocks = append(s.blocks, s.active) + s.capacity += blockSize + s.offset = 0 + } else if s.offset == blockSize { + s.active = s.blocks[s.size/blockSize] + s.offset = 0 + } + if s.setIndex != nil { + s.setIndex(data.(*item).value, s.size) + } + s.active[s.offset] = data.(*item) + s.offset++ + s.size++ +} + +// Pops a value off the stack and returns it. Currently no shrinking is done. +// Required by heap.Interface. +func (s *sstack) Pop() (res interface{}) { + s.size-- + s.offset-- + if s.offset < 0 { + s.offset = blockSize - 1 + s.active = s.blocks[s.size/blockSize] + } + res, s.active[s.offset] = s.active[s.offset], nil + if s.setIndex != nil { + s.setIndex(res.(*item).value, -1) + } + return +} + +// Returns the length of the stack. Required by sort.Interface. +func (s *sstack) Len() int { + return s.size +} + +// Compares the priority of two elements of the stack (higher is first). +// Required by sort.Interface. +func (s *sstack) Less(i, j int) bool { + return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0 +} + +// Swaps two elements in the stack. Required by sort.Interface. +func (s *sstack) Swap(i, j int) { + ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize + a, b := s.blocks[jb][jo], s.blocks[ib][io] + if s.setIndex != nil { + s.setIndex(a.value, i) + s.setIndex(b.value, j) + } + s.blocks[ib][io], s.blocks[jb][jo] = a, b +} + +// Resets the stack, effectively clearing its contents. +func (s *sstack) Reset() { + *s = *newSstack(s.setIndex) +} diff --git a/les/freeclient.go b/les/freeclient.go new file mode 100644 index 000000000..5ee607be8 --- /dev/null +++ b/les/freeclient.go @@ -0,0 +1,278 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "io" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// freeClientPool implements a client database that limits the connection time +// of each client and manages accepting/rejecting incoming connections and even +// kicking out some connected clients. The pool calculates recent usage time +// for each known client (a value that increases linearly when the client is +// connected and decreases exponentially when not connected). Clients with lower +// recent usage are preferred, unknown nodes have the highest priority. Already +// connected nodes receive a small bias in their favor in order to avoid accepting +// and instantly kicking out clients. +// +// Note: the pool can use any string for client identification. Using signature +// keys for that purpose would not make sense when being known has a negative +// value for the client. Currently the LES protocol manager uses IP addresses +// (without port address) to identify clients. +type freeClientPool struct { + db ethdb.Database + lock sync.Mutex + clock mclock.Clock + closed bool + + connectedLimit, totalLimit int + + addressMap map[string]*freeClientPoolEntry + connPool, disconnPool *prque.Prque + startupTime mclock.AbsTime + logOffsetAtStartup int64 +} + +const ( + recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage + fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format + connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon +) + +// newFreeClientPool creates a new free client pool +func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool { + pool := &freeClientPool{ + db: db, + clock: clock, + addressMap: make(map[string]*freeClientPoolEntry), + connPool: prque.New(poolSetIndex), + disconnPool: prque.New(poolSetIndex), + connectedLimit: connectedLimit, + totalLimit: totalLimit, + } + pool.loadFromDb() + return pool +} + +func (f *freeClientPool) stop() { + f.lock.Lock() + f.closed = true + f.saveToDb() + f.lock.Unlock() +} + +// connect should be called after a successful handshake. If the connection was +// rejected, there is no need to call disconnect. +// +// Note: the disconnectFn callback should not block. +func (f *freeClientPool) connect(address string, disconnectFn func()) bool { + f.lock.Lock() + defer f.lock.Unlock() + + if f.closed { + return false + } + e := f.addressMap[address] + now := f.clock.Now() + var recentUsage int64 + if e == nil { + e = &freeClientPoolEntry{address: address, index: -1} + f.addressMap[address] = e + } else { + if e.connected { + log.Debug("Client already connected", "address", address) + return false + } + recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier)) + } + e.linUsage = recentUsage - int64(now) + // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool + if f.connPool.Size() == f.connectedLimit { + i := f.connPool.PopItem().(*freeClientPoolEntry) + if e.linUsage+int64(connectedBias)-i.linUsage < 0 { + // kick it out and accept the new client + f.connPool.Remove(i.index) + f.calcLogUsage(i, now) + i.connected = false + f.disconnPool.Push(i, -i.logUsage) + log.Debug("Client kicked out", "address", i.address) + i.disconnectFn() + } else { + // keep the old client and reject the new one + f.connPool.Push(i, i.linUsage) + log.Debug("Client rejected", "address", address) + return false + } + } + f.disconnPool.Remove(e.index) + e.connected = true + e.disconnectFn = disconnectFn + f.connPool.Push(e, e.linUsage) + if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { + f.disconnPool.Pop() + } + log.Debug("Client accepted", "address", address) + return true +} + +// disconnect should be called when a connection is terminated. If the disconnection +// was initiated by the pool itself using disconnectFn then calling disconnect is +// not necessary but permitted. +func (f *freeClientPool) disconnect(address string) { + f.lock.Lock() + defer f.lock.Unlock() + + if f.closed { + return + } + e := f.addressMap[address] + now := f.clock.Now() + if !e.connected { + log.Debug("Client already disconnected", "address", address) + return + } + + f.connPool.Remove(e.index) + f.calcLogUsage(e, now) + e.connected = false + f.disconnPool.Push(e, -e.logUsage) + log.Debug("Client disconnected", "address", address) +} + +// logOffset calculates the time-dependent offset for the logarithmic +// representation of recent usage +func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 { + // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor + // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier. + logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier)) + return f.logOffsetAtStartup + logDecay +} + +// calcLogUsage converts recent usage from linear to logarithmic representation +// when disconnecting a peer or closing the client pool +func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) { + dt := e.linUsage + int64(now) + if dt < 1 { + dt = 1 + } + e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now) +} + +// freeClientPoolStorage is the RLP representation of the pool's database storage +type freeClientPoolStorage struct { + LogOffset uint64 + List []*freeClientPoolEntry +} + +// loadFromDb restores pool status from the database storage +// (automatically called at initialization) +func (f *freeClientPool) loadFromDb() { + enc, err := f.db.Get([]byte("freeClientPool")) + if err != nil { + return + } + var storage freeClientPoolStorage + err = rlp.DecodeBytes(enc, &storage) + if err != nil { + log.Error("Failed to decode client list", "err", err) + return + } + f.logOffsetAtStartup = int64(storage.LogOffset) + f.startupTime = f.clock.Now() + for _, e := range storage.List { + log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage) + f.addressMap[e.address] = e + f.disconnPool.Push(e, -e.logUsage) + } +} + +// saveToDb saves pool status to the database storage +// (automatically called during shutdown) +func (f *freeClientPool) saveToDb() { + now := f.clock.Now() + storage := freeClientPoolStorage{ + LogOffset: uint64(f.logOffset(now)), + List: make([]*freeClientPoolEntry, len(f.addressMap)), + } + i := 0 + for _, e := range f.addressMap { + if e.connected { + f.calcLogUsage(e, now) + } + storage.List[i] = e + i++ + } + enc, err := rlp.EncodeToBytes(storage) + if err != nil { + log.Error("Failed to encode client list", "err", err) + } else { + f.db.Put([]byte("freeClientPool"), enc) + } +} + +// freeClientPoolEntry represents a client address known by the pool. +// When connected, recent usage is calculated as linUsage + int64(clock.Now()) +// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset +// also grows linearly with time while the server is running. +// Conversion between linear and logarithmic representation happens when connecting +// or disconnecting the node. +// +// Note: linUsage and logUsage are values used with constantly growing offsets so +// even though they are close to each other at any time they may wrap around int64 +// limits over time. Comparison should be performed accordingly. +type freeClientPoolEntry struct { + address string + connected bool + disconnectFn func() + linUsage, logUsage int64 + index int +} + +func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)}) +} + +func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error { + var entry struct { + Address string + LogUsage uint64 + } + if err := s.Decode(&entry); err != nil { + return err + } + e.address = entry.Address + e.logUsage = int64(entry.LogUsage) + e.connected = false + e.index = -1 + return nil +} + +// poolSetIndex callback is used by both priority queues to set/update the index of +// the element in the queue. Index is needed to remove elements other than the top one. +func poolSetIndex(a interface{}, i int) { + a.(*freeClientPoolEntry).index = i +} diff --git a/les/freeclient_test.go b/les/freeclient_test.go new file mode 100644 index 000000000..e95abc7aa --- /dev/null +++ b/les/freeclient_test.go @@ -0,0 +1,139 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package light implements on-demand retrieval capable state and chain objects +// for the Ethereum Light Client. +package les + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/ethdb" +) + +func TestFreeClientPoolL10C100(t *testing.T) { + testFreeClientPool(t, 10, 100) +} + +func TestFreeClientPoolL40C200(t *testing.T) { + testFreeClientPool(t, 40, 200) +} + +func TestFreeClientPoolL100C300(t *testing.T) { + testFreeClientPool(t, 100, 300) +} + +const testFreeClientPoolTicks = 500000 + +func testFreeClientPool(t *testing.T, connLimit, clientCount int) { + var ( + clock mclock.Simulated + db = ethdb.NewMemDatabase() + pool = newFreeClientPool(db, connLimit, 10000, &clock) + connected = make([]bool, clientCount) + connTicks = make([]int, clientCount) + disconnCh = make(chan int, clientCount) + ) + peerId := func(i int) string { + return fmt.Sprintf("test peer #%d", i) + } + disconnFn := func(i int) func() { + return func() { + disconnCh <- i + } + } + + // pool should accept new peers up to its connected limit + for i := 0; i < connLimit; i++ { + if pool.connect(peerId(i), disconnFn(i)) { + connected[i] = true + } else { + t.Fatalf("Test peer #%d rejected", i) + } + } + // since all accepted peers are new and should not be kicked out, the next one should be rejected + if pool.connect(peerId(connLimit), disconnFn(connLimit)) { + connected[connLimit] = true + t.Fatalf("Peer accepted over connected limit") + } + + // randomly connect and disconnect peers, expect to have a similar total connection time at the end + for tickCounter := 0; tickCounter < testFreeClientPoolTicks; tickCounter++ { + clock.Run(1 * time.Second) + + i := rand.Intn(clientCount) + if connected[i] { + pool.disconnect(peerId(i)) + connected[i] = false + connTicks[i] += tickCounter + } else { + if pool.connect(peerId(i), disconnFn(i)) { + connected[i] = true + connTicks[i] -= tickCounter + } + } + pollDisconnects: + for { + select { + case i := <-disconnCh: + pool.disconnect(peerId(i)) + if connected[i] { + connTicks[i] += tickCounter + connected[i] = false + } + default: + break pollDisconnects + } + } + } + + expTicks := testFreeClientPoolTicks * connLimit / clientCount + expMin := expTicks - expTicks/10 + expMax := expTicks + expTicks/10 + + // check if the total connected time of peers are all in the expected range + for i, c := range connected { + if c { + connTicks[i] += testFreeClientPoolTicks + } + if connTicks[i] < expMin || connTicks[i] > expMax { + t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], expMin, expMax) + } + } + + // a previously unknown peer should be accepted now + if !pool.connect("newPeer", func() {}) { + t.Fatalf("Previously unknown peer rejected") + } + + // close and restart pool + pool.stop() + pool = newFreeClientPool(db, connLimit, 10000, &clock) + + // try connecting all known peers (connLimit should be filled up) + for i := 0; i < clientCount; i++ { + pool.connect(peerId(i), func() {}) + } + // expect pool to remember known nodes and kick out one of them to accept a new one + if !pool.connect("newPeer2", func() {}) { + t.Errorf("Previously unknown peer rejected after restarting pool") + } + pool.stop() +} diff --git a/les/handler.go b/les/handler.go index 2fc4cde34..91a235bf0 100644 --- a/les/handler.go +++ b/les/handler.go @@ -28,6 +28,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" @@ -104,6 +105,7 @@ type ProtocolManager struct { odr *LesOdr server *LesServer serverPool *serverPool + clientPool *freeClientPool lesTopic discv5.Topic reqDist *requestDistributor retriever *retrieveManager @@ -226,6 +228,7 @@ func (pm *ProtocolManager) Start(maxPeers int) { if pm.lightSync { go pm.syncer() } else { + pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{}) go func() { for range pm.newPeerCh { } @@ -243,6 +246,9 @@ func (pm *ProtocolManager) Stop() { pm.noMorePeers <- struct{}{} close(pm.quitSync) // quits syncer, fetcher + if pm.clientPool != nil { + pm.clientPool.stop() + } // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. @@ -264,7 +270,8 @@ func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgRea // this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { // Ignore maxPeers if this is a trusted peer - if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { + // In server mode we try to check into the client pool after handshake + if pm.lightSync && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { return p2p.DiscTooManyPeers } @@ -282,6 +289,19 @@ func (pm *ProtocolManager) handle(p *peer) error { p.Log().Debug("Light Ethereum handshake failed", "err", err) return err } + + if !pm.lightSync && !p.Peer.Info().Network.Trusted { + addr, ok := p.RemoteAddr().(*net.TCPAddr) + // test peer address is not a tcp address, don't use client pool if can not typecast + if ok { + id := addr.IP.String() + if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) { + return p2p.DiscTooManyPeers + } + defer pm.clientPool.disconnect(id) + } + } + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rw.Init(p.version) } |