From a5d08c893d61f66d60d8a91216aee5347b78f93e Mon Sep 17 00:00:00 2001 From: Felföldi Zsolt Date: Wed, 21 Jun 2017 03:27:38 -0700 Subject: les: code refactoring (#14416) This commit does various code refactorings: - generalizes and moves the request retrieval/timeout/resend logic out of LesOdr (will be used by a subsequent PR) - reworks the peer management logic so that all services can register with peerSet to get notified about added/dropped peers (also gets rid of the ugly getAllPeers callback in requestDistributor) - moves peerSet, LesOdr, requestDistributor and retrieveManager initialization out of ProtocolManager because I believe they do not really belong there and the whole init process was ugly and ad-hoc --- les/handler.go | 207 ++++++++++++++++++++++----------------------------------- 1 file changed, 79 insertions(+), 128 deletions(-) (limited to 'les/handler.go') diff --git a/les/handler.go b/les/handler.go index 64023af0f..77bc077a2 100644 --- a/les/handler.go +++ b/les/handler.go @@ -102,7 +102,9 @@ type ProtocolManager struct { odr *LesOdr server *LesServer serverPool *serverPool + lesTopic discv5.Topic reqDist *requestDistributor + retriever *retrieveManager downloader *downloader.Downloader fetcher *lightFetcher @@ -123,12 +125,12 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing - wg sync.WaitGroup + wg *sync.WaitGroup } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, @@ -136,15 +138,20 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network blockchain: blockchain, chainConfig: chainConfig, chainDb: chainDb, + odr: odr, networkId: networkId, txpool: txpool, txrelay: txrelay, - odr: odr, - peers: newPeerSet(), + peers: peers, newPeerCh: make(chan *peer), - quitSync: make(chan struct{}), + quitSync: quitSync, + wg: wg, noMorePeers: make(chan struct{}), } + if odr != nil { + manager.retriever = odr.retriever + manager.reqDist = odr.retriever.dist + } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { @@ -202,84 +209,22 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) + manager.peers.notify((*downloaderPeerNotify)(manager)) + manager.fetcher = newLightFetcher(manager) } - manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} { - m := make(map[distPeer]struct{}) - peers := manager.peers.AllPeers() - for _, peer := range peers { - m[peer] = struct{}{} - } - return m - }, manager.quitSync) - if odr != nil { - odr.removePeer = removePeer - odr.reqDist = manager.reqDist - } - - /*validator := func(block *types.Block, parent *types.Block) error { - return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) - } - heighter := func() uint64 { - return chainman.LastBlockNumberU64() - } - manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer) - */ return manager, nil } +// removePeer initiates disconnection from a peer by removing it from the peer set func (pm *ProtocolManager) removePeer(id string) { - // Short circuit if the peer was already removed - peer := pm.peers.Peer(id) - if peer == nil { - return - } - log.Debug("Removing light Ethereum peer", "peer", id) - if err := pm.peers.Unregister(id); err != nil { - if err == errNotRegistered { - return - } - } - // Unregister the peer from the downloader and Ethereum peer set - if pm.lightSync { - pm.downloader.UnregisterPeer(id) - if pm.txrelay != nil { - pm.txrelay.removePeer(id) - } - if pm.fetcher != nil { - pm.fetcher.removePeer(peer) - } - } - // Hard disconnect at the networking layer - if peer != nil { - peer.Peer.Disconnect(p2p.DiscUselessPeer) - } + pm.peers.Unregister(id) } -func (pm *ProtocolManager) Start(srvr *p2p.Server) { - var topicDisc *discv5.Network - if srvr != nil { - topicDisc = srvr.DiscV5 - } - lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) +func (pm *ProtocolManager) Start() { if pm.lightSync { - // start sync handler - if srvr != nil { // srvr is nil during testing - pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) - pm.odr.serverPool = pm.serverPool - pm.fetcher = newLightFetcher(pm) - } go pm.syncer() } else { - if topicDisc != nil { - go func() { - logger := log.New("topic", lesTopic) - logger.Info("Starting topic registration") - defer logger.Info("Terminated topic registration") - - topicDisc.RegisterTopic(lesTopic, pm.quitSync) - }() - } go func() { for range pm.newPeerCh { } @@ -342,65 +287,10 @@ func (pm *ProtocolManager) handle(p *peer) error { }() // Register the peer in the downloader. If the downloader considers it banned, we disconnect if pm.lightSync { - requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := getNextReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == p - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pm.reqDist.queue(rq) - if !ok { - return ErrNoPeers - } - return nil - } - requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { - reqID := getNextReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == p - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pm.reqDist.queue(rq) - if !ok { - return ErrNoPeers - } - return nil - } - if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, - requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { - return err - } - if pm.txrelay != nil { - pm.txrelay.addPeer(p) - } - p.lock.Lock() head := p.headInfo p.lock.Unlock() if pm.fetcher != nil { - pm.fetcher.addPeer(p) pm.fetcher.announce(p, head) } @@ -926,7 +816,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } if deliverMsg != nil { - err := pm.odr.Deliver(p, deliverMsg) + err := pm.retriever.deliver(p, deliverMsg) if err != nil { p.responseErrors++ if p.responseErrors > maxResponseErrors { @@ -946,3 +836,64 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo { Head: self.blockchain.LastBlockHash(), } } + +// downloaderPeerNotify implements peerSetNotify +type downloaderPeerNotify ProtocolManager + +func (d *downloaderPeerNotify) registerPeer(p *peer) { + pm := (*ProtocolManager)(d) + + requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { + reqID := genReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil + } + requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { + reqID := genReqID() + rq := &distReq{ + getCost: func(dp distPeer) uint64 { + peer := dp.(*peer) + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { + return dp.(*peer) == p + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) + cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) + peer.fcServer.QueueRequest(reqID, cost) + return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } + }, + } + _, ok := <-pm.reqDist.queue(rq) + if !ok { + return ErrNoPeers + } + return nil + } + + pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil) +} + +func (d *downloaderPeerNotify) unregisterPeer(p *peer) { + pm := (*ProtocolManager)(d) + pm.downloader.UnregisterPeer(p.id) +} -- cgit