aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@users.noreply.github.com>2016-12-13 03:46:15 +0800
committerGitHub <noreply@github.com>2016-12-13 03:46:15 +0800
commita98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b (patch)
treefaccd87d6e1634b51f788fa170bc1f03a829ca42 /les/handler.go
parentee445a2ba4013f8b32e4e5386322babf022e5b81 (diff)
parentf12f8a6c14dbaf6e6531cea1b0cf169b851e1894 (diff)
downloaddexon-a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b.tar.gz
dexon-a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b.tar.zst
dexon-a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b.zip
Merge pull request #3413 from zsfelfoldi/light-topic4
les, p2p/discv5: implement server pool, improve peer selection, light fetcher and topic searching
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go109
1 files changed, 53 insertions, 56 deletions
diff --git a/les/handler.go b/les/handler.go
index fdf4e6e8a..b024841f2 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -22,8 +22,8 @@ import (
"errors"
"fmt"
"math/big"
+ "net"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -58,7 +58,7 @@ const (
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request
- disableClientRemovePeer = true
+ disableClientRemovePeer = false
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -101,10 +101,7 @@ type ProtocolManager struct {
chainDb ethdb.Database
odr *LesOdr
server *LesServer
-
- topicDisc *discv5.Network
- lesTopic discv5.Topic
- p2pServer *p2p.Server
+ serverPool *serverPool
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -157,13 +154,29 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ var entry *poolEntry
peer := manager.newPeer(int(version), networkId, p, rw)
+ if manager.serverPool != nil {
+ addr := p.RemoteAddr().(*net.TCPAddr)
+ entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
+ if entry == nil {
+ return fmt.Errorf("unwanted connection")
+ }
+ }
+ peer.poolEntry = entry
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
- return manager.handle(peer)
+ err := manager.handle(peer)
+ if entry != nil {
+ manager.serverPool.disconnect(entry)
+ }
+ return err
case <-manager.quitSync:
+ if entry != nil {
+ manager.serverPool.disconnect(entry)
+ }
return p2p.DiscQuitting
}
},
@@ -192,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
- manager.fetcher = newLightFetcher(manager)
}
if odr != nil {
@@ -222,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
if pm.lightSync {
pm.downloader.UnregisterPeer(id)
- pm.odr.UnregisterPeer(peer)
if pm.txrelay != nil {
pm.txrelay.removePeer(id)
}
+ if pm.fetcher != nil {
+ pm.fetcher.removePeer(peer)
+ }
}
if err := pm.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err)
@@ -236,54 +250,26 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) findServers() {
- if pm.p2pServer == nil || pm.topicDisc == nil {
- return
- }
- glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
- enodes := make(chan string, 100)
- stop := make(chan struct{})
- go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
- go func() {
- added := make(map[string]bool)
- for {
- select {
- case enode := <-enodes:
- if !added[enode] {
- glog.V(logger.Info).Infoln("Found LES server:", enode)
- added[enode] = true
- if node, err := discover.ParseNode(enode); err == nil {
- pm.p2pServer.AddPeer(node)
- }
- }
- case <-stop:
- return
- }
- }
- }()
- select {
- case <-time.After(time.Second * 20):
- case <-pm.quitSync:
- }
- close(stop)
-}
-
func (pm *ProtocolManager) Start(srvr *p2p.Server) {
- pm.p2pServer = srvr
+ var topicDisc *discv5.Network
if srvr != nil {
- pm.topicDisc = srvr.DiscV5
+ topicDisc = srvr.DiscV5
}
- pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
+ lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
// start sync handler
- go pm.findServers()
+ if srvr != nil { // srvr is nil during testing
+ pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
+ pm.odr.serverPool = pm.serverPool
+ pm.fetcher = newLightFetcher(pm)
+ }
go pm.syncer()
} else {
- if pm.topicDisc != nil {
+ if topicDisc != nil {
go func() {
- glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
- pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
- glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
+ glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic))
+ topicDisc.RegisterTopic(lesTopic, pm.quitSync)
+ glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic))
}()
}
go func() {
@@ -352,13 +338,13 @@ func (pm *ProtocolManager) handle(p *peer) error {
glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
if pm.lightSync {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
- reqID := pm.odr.getNextReqID()
+ reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
- reqID := pm.odr.getNextReqID()
+ reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
@@ -367,12 +353,21 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
return err
}
- pm.odr.RegisterPeer(p)
if pm.txrelay != nil {
pm.txrelay.addPeer(p)
}
- pm.fetcher.notify(p, nil)
+ p.lock.Lock()
+ head := p.headInfo
+ p.lock.Unlock()
+ if pm.fetcher != nil {
+ pm.fetcher.addPeer(p)
+ pm.fetcher.announce(p, head)
+ }
+
+ if p.poolEntry != nil {
+ pm.serverPool.registered(p.poolEntry)
+ }
}
stop := make(chan struct{})
@@ -454,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
- pm.fetcher.notify(p, &req)
+ if pm.fetcher != nil {
+ go pm.fetcher.announce(p, &req)
+ }
case GetBlockHeadersMsg:
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@@ -552,8 +549,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.GotReply(resp.ReqID, resp.BV)
- if pm.fetcher.requestedID(resp.ReqID) {
- pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers)
+ if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
+ pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
if err != nil {